morz-infoboard/player/agent/internal/mqttsubscriber/subscriber.go
2026-03-26 23:23:23 +01:00

178 lines
5.5 KiB
Go

// Package mqttsubscriber subscribes to playlist-changed MQTT notifications.
// It is safe for concurrent use and applies client-side debouncing so that
// a burst of messages within a 3-second window triggers at most one callback.
package mqttsubscriber
import (
"encoding/json"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const (
// debounceDuration is the minimum interval between two callback invocations.
// Any MQTT message arriving while the timer is still running resets it.
// 500ms reicht aus um Bursts zu absorbieren, ohne die Latenz merklich zu erhöhen.
debounceDuration = 500 * time.Millisecond
// playlistChangedTopicTemplate is the topic the backend publishes to.
playlistChangedTopic = "signage/screen/%s/playlist-changed"
// screenshotRequestTopicTemplate is the topic the backend publishes to for on-demand screenshots.
screenshotRequestTopicTemplate = "signage/screen/%s/screenshot-request"
commandTopicTemplate = "signage/screen/%s/command"
)
// PlaylistChangedFunc is called when a debounced playlist-changed notification arrives.
type PlaylistChangedFunc func()
// ScreenshotRequestFunc is called when a screenshot-request notification arrives.
type ScreenshotRequestFunc func()
// DisplayCommandFunc wird aufgerufen wenn ein display-command eintrifft.
type DisplayCommandFunc func(action string)
// Subscriber listens for playlist-changed notifications on MQTT and calls the
// provided callback at most once per debounceDuration.
type Subscriber struct {
client mqtt.Client
timer *time.Timer
onChange PlaylistChangedFunc
onScreenshotRequest ScreenshotRequestFunc
onDisplayCommand DisplayCommandFunc
// timerC serializes timer resets through a dedicated goroutine.
resetC chan struct{}
screenshotReqC chan struct{}
commandC chan string
stopC chan struct{}
}
// Topic returns the MQTT topic for a given screenSlug.
func Topic(screenSlug string) string {
return "signage/screen/" + screenSlug + "/playlist-changed"
}
// ScreenshotRequestTopic returns the MQTT topic for on-demand screenshot requests for a given screenSlug.
func ScreenshotRequestTopic(screenSlug string) string {
return "signage/screen/" + screenSlug + "/screenshot-request"
}
// CommandTopic returns the MQTT topic for display commands for a given screenSlug.
func CommandTopic(screenSlug string) string {
return "signage/screen/" + screenSlug + "/command"
}
// New creates a Subscriber that connects to broker and subscribes to the
// playlist-changed topic for screenSlug. onChange is called (in its own
// goroutine) at most once per debounceDuration.
// onScreenshotRequest is called (in its own goroutine) when a screenshot-request message arrives.
// onDisplayCommand is called (in its own goroutine) when a display command arrives.
//
// Returns nil when broker is empty — callers must handle nil.
func New(broker, screenSlug, username, password string, onChange PlaylistChangedFunc, onScreenshotRequest ScreenshotRequestFunc, onDisplayCommand DisplayCommandFunc) *Subscriber {
if broker == "" {
return nil
}
s := &Subscriber{
onChange: onChange,
onScreenshotRequest: onScreenshotRequest,
onDisplayCommand: onDisplayCommand,
resetC: make(chan struct{}, 16),
screenshotReqC: make(chan struct{}, 16),
commandC: make(chan string, 8),
stopC: make(chan struct{}),
}
topic := Topic(screenSlug)
screenshotTopic := ScreenshotRequestTopic(screenSlug)
opts := mqtt.NewClientOptions().
AddBroker(broker).
SetClientID("morz-agent-sub-" + screenSlug).
SetCleanSession(true).
SetAutoReconnect(true).
SetConnectRetry(true).
SetConnectRetryInterval(10 * time.Second).
SetOnConnectHandler(func(c mqtt.Client) {
// Re-subscribe after reconnect.
c.Subscribe(topic, 0, func(_ mqtt.Client, _ mqtt.Message) { //nolint:errcheck
select {
case s.resetC <- struct{}{}:
default: // channel full — debounce timer will fire anyway
}
})
c.Subscribe(screenshotTopic, 0, func(_ mqtt.Client, _ mqtt.Message) { //nolint:errcheck
select {
case s.screenshotReqC <- struct{}{}:
default: // channel full — request already pending
}
})
commandTopic := CommandTopic(screenSlug)
c.Subscribe(commandTopic, 1, func(_ mqtt.Client, m mqtt.Message) { //nolint:errcheck
var cmd struct {
Action string `json:"action"`
}
if err := json.Unmarshal(m.Payload(), &cmd); err != nil {
return
}
select {
case s.commandC <- cmd.Action:
default:
}
})
})
if username != "" {
opts.SetUsername(username)
opts.SetPassword(password)
}
s.client = mqtt.NewClient(opts)
s.client.Connect() // non-blocking; paho retries in background
go s.run()
return s
}
// run is the debounce loop. It resets a timer on every incoming signal.
// When the timer fires the onChange callback is called once in a goroutine.
func (s *Subscriber) run() {
var timer *time.Timer
for {
select {
case <-s.stopC:
if timer != nil {
timer.Stop()
}
return
case <-s.resetC:
if timer != nil {
timer.Stop()
}
timer = time.AfterFunc(debounceDuration, func() {
go s.onChange()
})
case <-s.screenshotReqC:
if s.onScreenshotRequest != nil {
go s.onScreenshotRequest()
}
case action := <-s.commandC:
if s.onDisplayCommand != nil {
go s.onDisplayCommand(action)
}
}
}
}
// Close disconnects the MQTT client and stops the debounce loop.
func (s *Subscriber) Close() {
if s == nil {
return
}
close(s.stopC)
s.client.Disconnect(250)
}