morz-infoboard/server/backend/internal/mqttnotifier/notifier.go
Jesko Anschütz 79fcc20b79 fix(display): screen UUID lookup, authScreen middleware, JSON encoding
- playerstatus: look up screen by slug before UpsertDisplayState to pass UUID (not slug) and avoid FK violation
- router: switch display command route from authOnly to authScreen for proper permission enforcement
- display.go: remove redundant GetBySlug + requireScreenAccess (now handled by authScreen middleware), drop store dependency
- notifier: replace fmt.Sprintf %q with json.Marshal for correct JSON encoding of display command payload

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-26 23:35:05 +01:00

140 lines
4 KiB
Go

// Package mqttnotifier publishes playlist-changed notifications to MQTT.
// It is safe for concurrent use and applies per-screen debouncing so that
// rapid edits within a 2-second window produce at most one MQTT message.
package mqttnotifier
import (
"encoding/json"
"fmt"
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const (
// debounceDuration is the minimum time between two publish calls for the
// same screen. Any change arriving within this window resets the timer.
debounceDuration = 2 * time.Second
)
// Notifier publishes "playlist-changed" MQTT messages with per-screen debounce.
// If no broker URL is configured it behaves as a no-op (all methods are safe
// to call and do nothing).
type Notifier struct {
client mqtt.Client // nil when disabled
mu sync.Mutex
timers map[string]*time.Timer // keyed by screenSlug
}
// New creates a Notifier connected to broker (e.g. "tcp://mosquitto:1883").
// username/password may be empty. Returns a no-op Notifier when broker == "".
func New(broker, username, password string) *Notifier {
n := &Notifier{timers: make(map[string]*time.Timer)}
if broker == "" {
return n
}
opts := mqtt.NewClientOptions().
AddBroker(broker).
SetClientID("morz-backend").
SetCleanSession(true).
SetAutoReconnect(true).
SetConnectRetry(true).
SetConnectRetryInterval(10 * time.Second)
if username != "" {
opts.SetUsername(username)
opts.SetPassword(password)
}
n.client = mqtt.NewClient(opts)
n.client.Connect() // non-blocking; paho retries in background
return n
}
// Topic returns the MQTT topic for a screen's playlist-changed notification.
func Topic(screenSlug string) string {
return fmt.Sprintf("signage/screen/%s/playlist-changed", screenSlug)
}
// NotifyChanged schedules a publish for screenSlug. If another call for the
// same screen arrives within debounceDuration, the timer is reset (debounce).
// The method returns immediately; the actual publish happens in a goroutine.
func (n *Notifier) NotifyChanged(screenSlug string) {
if n.client == nil {
return
}
n.mu.Lock()
defer n.mu.Unlock()
// Reset existing timer if present (debounce).
if t, ok := n.timers[screenSlug]; ok {
t.Stop()
}
n.timers[screenSlug] = time.AfterFunc(debounceDuration, func() {
n.mu.Lock()
delete(n.timers, screenSlug)
n.mu.Unlock()
n.publish(screenSlug)
})
}
// RequestScreenshot publishes a screenshot-request message to the screen's MQTT topic.
// It is a no-op when the client is not connected.
func (n *Notifier) RequestScreenshot(screenSlug string) {
if n.client == nil {
return
}
topic := fmt.Sprintf("signage/screen/%s/screenshot-request", screenSlug)
payload := []byte(fmt.Sprintf(`{"ts":%d}`, time.Now().UnixMilli()))
token := n.client.Publish(topic, 0, false, payload)
token.WaitTimeout(3 * time.Second)
}
// SendDisplayCommand publiziert einen Display-Befehl (display_on/display_off)
// auf das Command-Topic des Screens. Retained + QoS 1, damit der Agent den
// letzten Sollzustand auch nach einem Reconnect erhält.
func (n *Notifier) SendDisplayCommand(screenSlug, action string) error {
if n.client == nil {
return nil
}
topic := fmt.Sprintf("signage/screen/%s/command", screenSlug)
b, err := json.Marshal(struct {
Action string `json:"action"`
}{Action: action})
if err != nil {
return fmt.Errorf("marshal display command: %w", err)
}
token := n.client.Publish(topic, 1, true, b)
if !token.WaitTimeout(5 * time.Second) {
return fmt.Errorf("mqtt publish display command: timeout")
}
return token.Error()
}
func (n *Notifier) publish(screenSlug string) {
topic := Topic(screenSlug)
payload := []byte(fmt.Sprintf(`{"ts":%d}`, time.Now().UnixMilli()))
token := n.client.Publish(topic, 0, false, payload)
token.WaitTimeout(3 * time.Second)
// Errors are silently dropped — the 60 s polling in the agent is the fallback.
}
// Close disconnects the MQTT client gracefully.
func (n *Notifier) Close() {
if n.client == nil {
return
}
n.mu.Lock()
for _, t := range n.timers {
t.Stop()
}
n.timers = make(map[string]*time.Timer)
n.mu.Unlock()
n.client.Disconnect(250)
}