From 10475721578eacad31b0d359325649be4452f76a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jesko=20Ansch=C3=BCtz?= Date: Thu, 26 Mar 2026 23:23:23 +0100 Subject: [PATCH] feat(agent): mqttsubscriber abonniert Command-Topic --- .../internal/mqttsubscriber/subscriber.go | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/player/agent/internal/mqttsubscriber/subscriber.go b/player/agent/internal/mqttsubscriber/subscriber.go index 2298cc0..58c1920 100644 --- a/player/agent/internal/mqttsubscriber/subscriber.go +++ b/player/agent/internal/mqttsubscriber/subscriber.go @@ -4,6 +4,7 @@ package mqttsubscriber import ( + "encoding/json" "time" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -20,6 +21,8 @@ const ( // screenshotRequestTopicTemplate is the topic the backend publishes to for on-demand screenshots. screenshotRequestTopicTemplate = "signage/screen/%s/screenshot-request" + + commandTopicTemplate = "signage/screen/%s/command" ) // PlaylistChangedFunc is called when a debounced playlist-changed notification arrives. @@ -28,6 +31,9 @@ type PlaylistChangedFunc func() // ScreenshotRequestFunc is called when a screenshot-request notification arrives. type ScreenshotRequestFunc func() +// DisplayCommandFunc wird aufgerufen wenn ein display-command eintrifft. +type DisplayCommandFunc func(action string) + // Subscriber listens for playlist-changed notifications on MQTT and calls the // provided callback at most once per debounceDuration. type Subscriber struct { @@ -35,10 +41,12 @@ type Subscriber struct { timer *time.Timer onChange PlaylistChangedFunc onScreenshotRequest ScreenshotRequestFunc + onDisplayCommand DisplayCommandFunc // timerC serializes timer resets through a dedicated goroutine. resetC chan struct{} screenshotReqC chan struct{} + commandC chan string stopC chan struct{} } @@ -52,13 +60,19 @@ func ScreenshotRequestTopic(screenSlug string) string { return "signage/screen/" + screenSlug + "/screenshot-request" } +// CommandTopic returns the MQTT topic for display commands for a given screenSlug. +func CommandTopic(screenSlug string) string { + return "signage/screen/" + screenSlug + "/command" +} + // New creates a Subscriber that connects to broker and subscribes to the // playlist-changed topic for screenSlug. onChange is called (in its own // goroutine) at most once per debounceDuration. // onScreenshotRequest is called (in its own goroutine) when a screenshot-request message arrives. +// onDisplayCommand is called (in its own goroutine) when a display command arrives. // // Returns nil when broker is empty — callers must handle nil. -func New(broker, screenSlug, username, password string, onChange PlaylistChangedFunc, onScreenshotRequest ScreenshotRequestFunc) *Subscriber { +func New(broker, screenSlug, username, password string, onChange PlaylistChangedFunc, onScreenshotRequest ScreenshotRequestFunc, onDisplayCommand DisplayCommandFunc) *Subscriber { if broker == "" { return nil } @@ -66,8 +80,10 @@ func New(broker, screenSlug, username, password string, onChange PlaylistChanged s := &Subscriber{ onChange: onChange, onScreenshotRequest: onScreenshotRequest, + onDisplayCommand: onDisplayCommand, resetC: make(chan struct{}, 16), screenshotReqC: make(chan struct{}, 16), + commandC: make(chan string, 8), stopC: make(chan struct{}), } @@ -95,6 +111,19 @@ func New(broker, screenSlug, username, password string, onChange PlaylistChanged default: // channel full — request already pending } }) + commandTopic := CommandTopic(screenSlug) + c.Subscribe(commandTopic, 1, func(_ mqtt.Client, m mqtt.Message) { //nolint:errcheck + var cmd struct { + Action string `json:"action"` + } + if err := json.Unmarshal(m.Payload(), &cmd); err != nil { + return + } + select { + case s.commandC <- cmd.Action: + default: + } + }) }) if username != "" { @@ -131,6 +160,10 @@ func (s *Subscriber) run() { if s.onScreenshotRequest != nil { go s.onScreenshotRequest() } + case action := <-s.commandC: + if s.onDisplayCommand != nil { + go s.onDisplayCommand(action) + } } } }