// Package mqttsubscriber subscribes to playlist-changed MQTT notifications. // It is safe for concurrent use and applies client-side debouncing so that // a burst of messages within a 3-second window triggers at most one callback. package mqttsubscriber import ( "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) const ( // debounceDuration is the minimum interval between two callback invocations. // Any MQTT message arriving while the timer is still running resets it. // 500ms reicht aus um Bursts zu absorbieren, ohne die Latenz merklich zu erhöhen. debounceDuration = 500 * time.Millisecond // playlistChangedTopicTemplate is the topic the backend publishes to. playlistChangedTopic = "signage/screen/%s/playlist-changed" ) // PlaylistChangedFunc is called when a debounced playlist-changed notification arrives. type PlaylistChangedFunc func() // Subscriber listens for playlist-changed notifications on MQTT and calls the // provided callback at most once per debounceDuration. type Subscriber struct { client mqtt.Client timer *time.Timer onChange PlaylistChangedFunc // timerC serializes timer resets through a dedicated goroutine. resetC chan struct{} stopC chan struct{} } // Topic returns the MQTT topic for a given screenSlug. func Topic(screenSlug string) string { return "signage/screen/" + screenSlug + "/playlist-changed" } // New creates a Subscriber that connects to broker and subscribes to the // playlist-changed topic for screenSlug. onChange is called (in its own // goroutine) at most once per debounceDuration. // // Returns nil when broker is empty — callers must handle nil. func New(broker, screenSlug, username, password string, onChange PlaylistChangedFunc) *Subscriber { if broker == "" { return nil } s := &Subscriber{ onChange: onChange, resetC: make(chan struct{}, 16), stopC: make(chan struct{}), } topic := Topic(screenSlug) opts := mqtt.NewClientOptions(). AddBroker(broker). SetClientID("morz-agent-sub-" + screenSlug). SetCleanSession(true). SetAutoReconnect(true). SetConnectRetry(true). SetConnectRetryInterval(10 * time.Second). SetOnConnectHandler(func(c mqtt.Client) { // Re-subscribe after reconnect. c.Subscribe(topic, 0, func(_ mqtt.Client, _ mqtt.Message) { //nolint:errcheck select { case s.resetC <- struct{}{}: default: // channel full — debounce timer will fire anyway } }) }) if username != "" { opts.SetUsername(username) opts.SetPassword(password) } s.client = mqtt.NewClient(opts) s.client.Connect() // non-blocking; paho retries in background go s.run() return s } // run is the debounce loop. It resets a timer on every incoming signal. // When the timer fires the onChange callback is called once in a goroutine. func (s *Subscriber) run() { var timer *time.Timer for { select { case <-s.stopC: if timer != nil { timer.Stop() } return case <-s.resetC: if timer != nil { timer.Stop() } timer = time.AfterFunc(debounceDuration, func() { go s.onChange() }) } } } // Close disconnects the MQTT client and stops the debounce loop. func (s *Subscriber) Close() { if s == nil { return } close(s.stopC) s.client.Disconnect(250) }