178 lines
5.5 KiB
Go
178 lines
5.5 KiB
Go
// Package mqttsubscriber subscribes to playlist-changed MQTT notifications.
|
|
// It is safe for concurrent use and applies client-side debouncing so that
|
|
// a burst of messages within a 3-second window triggers at most one callback.
|
|
package mqttsubscriber
|
|
|
|
import (
|
|
"encoding/json"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
const (
|
|
// debounceDuration is the minimum interval between two callback invocations.
|
|
// Any MQTT message arriving while the timer is still running resets it.
|
|
// 500ms reicht aus um Bursts zu absorbieren, ohne die Latenz merklich zu erhöhen.
|
|
debounceDuration = 500 * time.Millisecond
|
|
|
|
// playlistChangedTopicTemplate is the topic the backend publishes to.
|
|
playlistChangedTopic = "signage/screen/%s/playlist-changed"
|
|
|
|
// screenshotRequestTopicTemplate is the topic the backend publishes to for on-demand screenshots.
|
|
screenshotRequestTopicTemplate = "signage/screen/%s/screenshot-request"
|
|
|
|
commandTopicTemplate = "signage/screen/%s/command"
|
|
)
|
|
|
|
// PlaylistChangedFunc is called when a debounced playlist-changed notification arrives.
|
|
type PlaylistChangedFunc func()
|
|
|
|
// ScreenshotRequestFunc is called when a screenshot-request notification arrives.
|
|
type ScreenshotRequestFunc func()
|
|
|
|
// DisplayCommandFunc wird aufgerufen wenn ein display-command eintrifft.
|
|
type DisplayCommandFunc func(action string)
|
|
|
|
// Subscriber listens for playlist-changed notifications on MQTT and calls the
|
|
// provided callback at most once per debounceDuration.
|
|
type Subscriber struct {
|
|
client mqtt.Client
|
|
timer *time.Timer
|
|
onChange PlaylistChangedFunc
|
|
onScreenshotRequest ScreenshotRequestFunc
|
|
onDisplayCommand DisplayCommandFunc
|
|
|
|
// timerC serializes timer resets through a dedicated goroutine.
|
|
resetC chan struct{}
|
|
screenshotReqC chan struct{}
|
|
commandC chan string
|
|
stopC chan struct{}
|
|
}
|
|
|
|
// Topic returns the MQTT topic for a given screenSlug.
|
|
func Topic(screenSlug string) string {
|
|
return "signage/screen/" + screenSlug + "/playlist-changed"
|
|
}
|
|
|
|
// ScreenshotRequestTopic returns the MQTT topic for on-demand screenshot requests for a given screenSlug.
|
|
func ScreenshotRequestTopic(screenSlug string) string {
|
|
return "signage/screen/" + screenSlug + "/screenshot-request"
|
|
}
|
|
|
|
// CommandTopic returns the MQTT topic for display commands for a given screenSlug.
|
|
func CommandTopic(screenSlug string) string {
|
|
return "signage/screen/" + screenSlug + "/command"
|
|
}
|
|
|
|
// New creates a Subscriber that connects to broker and subscribes to the
|
|
// playlist-changed topic for screenSlug. onChange is called (in its own
|
|
// goroutine) at most once per debounceDuration.
|
|
// onScreenshotRequest is called (in its own goroutine) when a screenshot-request message arrives.
|
|
// onDisplayCommand is called (in its own goroutine) when a display command arrives.
|
|
//
|
|
// Returns nil when broker is empty — callers must handle nil.
|
|
func New(broker, screenSlug, username, password string, onChange PlaylistChangedFunc, onScreenshotRequest ScreenshotRequestFunc, onDisplayCommand DisplayCommandFunc) *Subscriber {
|
|
if broker == "" {
|
|
return nil
|
|
}
|
|
|
|
s := &Subscriber{
|
|
onChange: onChange,
|
|
onScreenshotRequest: onScreenshotRequest,
|
|
onDisplayCommand: onDisplayCommand,
|
|
resetC: make(chan struct{}, 16),
|
|
screenshotReqC: make(chan struct{}, 16),
|
|
commandC: make(chan string, 8),
|
|
stopC: make(chan struct{}),
|
|
}
|
|
|
|
topic := Topic(screenSlug)
|
|
screenshotTopic := ScreenshotRequestTopic(screenSlug)
|
|
|
|
opts := mqtt.NewClientOptions().
|
|
AddBroker(broker).
|
|
SetClientID("morz-agent-sub-" + screenSlug).
|
|
SetCleanSession(true).
|
|
SetAutoReconnect(true).
|
|
SetConnectRetry(true).
|
|
SetConnectRetryInterval(10 * time.Second).
|
|
SetOnConnectHandler(func(c mqtt.Client) {
|
|
// Re-subscribe after reconnect.
|
|
c.Subscribe(topic, 0, func(_ mqtt.Client, _ mqtt.Message) { //nolint:errcheck
|
|
select {
|
|
case s.resetC <- struct{}{}:
|
|
default: // channel full — debounce timer will fire anyway
|
|
}
|
|
})
|
|
c.Subscribe(screenshotTopic, 0, func(_ mqtt.Client, _ mqtt.Message) { //nolint:errcheck
|
|
select {
|
|
case s.screenshotReqC <- struct{}{}:
|
|
default: // channel full — request already pending
|
|
}
|
|
})
|
|
commandTopic := CommandTopic(screenSlug)
|
|
c.Subscribe(commandTopic, 1, func(_ mqtt.Client, m mqtt.Message) { //nolint:errcheck
|
|
var cmd struct {
|
|
Action string `json:"action"`
|
|
}
|
|
if err := json.Unmarshal(m.Payload(), &cmd); err != nil {
|
|
return
|
|
}
|
|
select {
|
|
case s.commandC <- cmd.Action:
|
|
default:
|
|
}
|
|
})
|
|
})
|
|
|
|
if username != "" {
|
|
opts.SetUsername(username)
|
|
opts.SetPassword(password)
|
|
}
|
|
|
|
s.client = mqtt.NewClient(opts)
|
|
s.client.Connect() // non-blocking; paho retries in background
|
|
|
|
go s.run()
|
|
return s
|
|
}
|
|
|
|
// run is the debounce loop. It resets a timer on every incoming signal.
|
|
// When the timer fires the onChange callback is called once in a goroutine.
|
|
func (s *Subscriber) run() {
|
|
var timer *time.Timer
|
|
for {
|
|
select {
|
|
case <-s.stopC:
|
|
if timer != nil {
|
|
timer.Stop()
|
|
}
|
|
return
|
|
case <-s.resetC:
|
|
if timer != nil {
|
|
timer.Stop()
|
|
}
|
|
timer = time.AfterFunc(debounceDuration, func() {
|
|
go s.onChange()
|
|
})
|
|
case <-s.screenshotReqC:
|
|
if s.onScreenshotRequest != nil {
|
|
go s.onScreenshotRequest()
|
|
}
|
|
case action := <-s.commandC:
|
|
if s.onDisplayCommand != nil {
|
|
go s.onDisplayCommand(action)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close disconnects the MQTT client and stops the debounce loop.
|
|
func (s *Subscriber) Close() {
|
|
if s == nil {
|
|
return
|
|
}
|
|
close(s.stopC)
|
|
s.client.Disconnect(250)
|
|
}
|