morz-infoboard/player/agent/internal/mqttsubscriber/subscriber.go
Jesko Anschütz 6931181916 Fix: Transition-Race, Auto-Reload nach Deploy, Playlist-Latenz < 1s
- hideAllContent() prüft opacity bevor display=none gesetzt wird
  (verhindert Race mit displayItem)
- Neuer /api/startup-token Endpoint: Browser erkennt Agent-Neustart
  und reloaded automatisch
- MQTT-Debounce von 3s auf 500ms, Browser-Poll von 30s auf 5s
  reduziert für sub-sekunden Playlist-Updates

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-23 11:48:57 +01:00

118 lines
3.2 KiB
Go

// Package mqttsubscriber subscribes to playlist-changed MQTT notifications.
// It is safe for concurrent use and applies client-side debouncing so that
// a burst of messages within a 3-second window triggers at most one callback.
package mqttsubscriber
import (
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const (
// debounceDuration is the minimum interval between two callback invocations.
// Any MQTT message arriving while the timer is still running resets it.
// 500ms reicht aus um Bursts zu absorbieren, ohne die Latenz merklich zu erhöhen.
debounceDuration = 500 * time.Millisecond
// playlistChangedTopicTemplate is the topic the backend publishes to.
playlistChangedTopic = "signage/screen/%s/playlist-changed"
)
// PlaylistChangedFunc is called when a debounced playlist-changed notification arrives.
type PlaylistChangedFunc func()
// Subscriber listens for playlist-changed notifications on MQTT and calls the
// provided callback at most once per debounceDuration.
type Subscriber struct {
client mqtt.Client
timer *time.Timer
onChange PlaylistChangedFunc
// timerC serializes timer resets through a dedicated goroutine.
resetC chan struct{}
stopC chan struct{}
}
// Topic returns the MQTT topic for a given screenSlug.
func Topic(screenSlug string) string {
return "signage/screen/" + screenSlug + "/playlist-changed"
}
// New creates a Subscriber that connects to broker and subscribes to the
// playlist-changed topic for screenSlug. onChange is called (in its own
// goroutine) at most once per debounceDuration.
//
// Returns nil when broker is empty — callers must handle nil.
func New(broker, screenSlug, username, password string, onChange PlaylistChangedFunc) *Subscriber {
if broker == "" {
return nil
}
s := &Subscriber{
onChange: onChange,
resetC: make(chan struct{}, 16),
stopC: make(chan struct{}),
}
topic := Topic(screenSlug)
opts := mqtt.NewClientOptions().
AddBroker(broker).
SetClientID("morz-agent-sub-" + screenSlug).
SetCleanSession(true).
SetAutoReconnect(true).
SetConnectRetry(true).
SetConnectRetryInterval(10 * time.Second).
SetOnConnectHandler(func(c mqtt.Client) {
// Re-subscribe after reconnect.
c.Subscribe(topic, 0, func(_ mqtt.Client, _ mqtt.Message) { //nolint:errcheck
select {
case s.resetC <- struct{}{}:
default: // channel full — debounce timer will fire anyway
}
})
})
if username != "" {
opts.SetUsername(username)
opts.SetPassword(password)
}
s.client = mqtt.NewClient(opts)
s.client.Connect() // non-blocking; paho retries in background
go s.run()
return s
}
// run is the debounce loop. It resets a timer on every incoming signal.
// When the timer fires the onChange callback is called once in a goroutine.
func (s *Subscriber) run() {
var timer *time.Timer
for {
select {
case <-s.stopC:
if timer != nil {
timer.Stop()
}
return
case <-s.resetC:
if timer != nil {
timer.Stop()
}
timer = time.AfterFunc(debounceDuration, func() {
go s.onChange()
})
}
}
}
// Close disconnects the MQTT client and stops the debounce loop.
func (s *Subscriber) Close() {
if s == nil {
return
}
close(s.stopC)
s.client.Disconnect(250)
}