diff --git a/TODO.md b/TODO.md index 4179b1c..265b508 100644 --- a/TODO.md +++ b/TODO.md @@ -78,6 +78,7 @@ - [x] Fallback-Verzeichnisbetrieb demonstrieren - [ ] `valid_from`/`valid_until` im Prototyp pruefen - [x] Offline-Sync mit lokalem Cache pruefen +- [x] MQTT-Topic `signage/screen/{screenSlug}/playlist-changed` spezifiziert und dokumentiert - [ ] MQTT-Kommandos `reload`, `restart_player`, `reboot`, `display_on`, `display_off` testweise durchspielen - [ ] globale Kampagne testen, die tenantbezogenen Content temporär ueberschreibt - [ ] Rueckfall auf Normalbetrieb nach manueller Deaktivierung pruefen @@ -89,6 +90,7 @@ - [x] Chromium-Kiosk-Startskript erstellen - [ ] Screenshot-Erzeugung auf dem Player integrieren - [x] Heartbeat- und Statusmeldungen integrieren +- [x] MQTT-Playlist-Change-Synchronisation mit Backend-Debounce (2s) und Agent-Debounce (3s) implementiert - [ ] Fehler- und Wiederanlaufverhalten verifizieren ## Phase 7 - Ansible-Automatisierung diff --git a/compose/server-stack.yml b/compose/server-stack.yml index a032ee7..3a9527b 100644 --- a/compose/server-stack.yml +++ b/compose/server-stack.yml @@ -32,11 +32,14 @@ services: MORZ_INFOBOARD_HTTP_ADDR: ":8080" MORZ_INFOBOARD_DATABASE_URL: "postgres://morz_infoboard:morz_infoboard@postgres:5432/morz_infoboard?sslmode=disable" MORZ_INFOBOARD_UPLOAD_DIR: "/uploads" + MORZ_INFOBOARD_MQTT_BROKER: "tcp://mosquitto:1883" volumes: - uploads:/uploads depends_on: postgres: condition: service_healthy + mosquitto: + condition: service_started restart: unless-stopped volumes: diff --git a/docs/API-MQTT-VERTRAG.md b/docs/API-MQTT-VERTRAG.md new file mode 100644 index 0000000..cfb9716 --- /dev/null +++ b/docs/API-MQTT-VERTRAG.md @@ -0,0 +1,136 @@ +# Info-Board Neu - MQTT-Vertrag + +Vertrag zwischen Backend und Agent für die Echtzeit-Synchronisation von Playlist-Änderungen und Gerätebefehlen. + +--- + +## Überblick + +Das Messaging-System nutzt MQTT für: + +- **Playlist-Mutations-Benachrichtigungen**: Backend → Agent +- **Device-Commands**: Backend → Agent (zukünftig: reload, restart_player, reboot, display_on/off) +- **Heartbeat & Status**: Agent → Backend (siehe `PLAYER-STATUS-HTTP.md`) + +Alle Topics folgen dem Naming-Pattern: `signage/{component}/{screenSlug}/{event}` + +--- + +## Topics + +### Backend publishes + +#### `signage/screen/{screenSlug}/playlist-changed` + +**Publisher:** Backend +**Subscriber:** Agent + +Wird nach jeder Mutation der Playlist gepublished (Add, Remove, Reorder, Enable/Disable Item). + +**Payload:** +```json +{ + "ts": 1711268440000 +} +``` + +- `ts`: Unix-Zeitstempel in Millisekunden des Änderungsereignisses auf dem Backend + +**Verhalten:** +- Backend debounced Änderungen über **2 Sekunden** +- Mehrere schnelle Mutationen werden zu einem Event zusammengefasst +- Garantiert mindestens ein Event pro logischer Änderung + +**Agent-Reaktion:** +- Agent empfängt das Event +- Agent debounced die Verarbeitung über **3 Sekunden** +- Agent startet sofortiges Playlist-Fetch via HTTP `GET /api/v1/screens/{screenSlug}/playlist` +- Agent speichert die Playlist lokal und signalisiert dem Browser einen Reload + +**Implementierung (Agent):** +```go +// Pseudocode +func OnPlaylistChanged(msg PlaylistChangedMessage) { + if debounceTimer.running { + debounceTimer.reset() + } else { + debounceTimer.start(3 * time.Second) + } +} + +func onDebounceExpire() { + playlist := fetchPlaylistViaHTTP() + saveToLocalCache(playlist) + signalBrowserReload() +} +``` + +--- + +## Zukünftige Topics + +Die folgenden Topics sind **geplant** für Phase 5 (Prototyping) und später: + +### `signage/screen/{screenSlug}/device-command` + +**Publisher:** Backend +**Subscriber:** Agent + +Befehl-Queue für Device-Steuerung. + +**Payload:** +```json +{ + "cmd_id": "uuid", + "command": "reload|restart_player|reboot|display_on|display_off", + "ts": 1711268440000 +} +``` + +**Agent-Reaktion:** +- Befehl ausführen +- ACK via HTTP POST zu `PUT /api/v1/screens/{screenSlug}/command-ack` + +--- + +## Beispiel-Flow: Playlist-Update + +``` +Admin: Click "Speichern" in Playlist-UI + ↓ +Backend: Playlist-Mutation in DB schreiben + ↓ +Backend: `playlist-changed` mit ts=now nach 2s Debounce publifyen + ↓ +Agent: Event empfangen, 3s Debounce starten + ↓ +Agent: Nach 3s → HTTP GET /api/v1/screens/{slug}/playlist + ↓ +Backend: Aktuelle Playlist zurückgeben + ↓ +Agent: Lokal speichern, Browser signalisieren "reload" + ↓ +Browser: Neuer Content geladen und abgespielt +``` + +--- + +## MQTT-Verbindungspezifikation + +(Siehe `PLAYER-KONZEPT.md` und Provisioning-Variablen für Broker-URL, Authentifizierung und Retry-Logik) + +- **Broker-Adresse:** Über Provisioning konfigurierbar (Standard: `tcp://backend:1883`) +- **Client-ID:** `{tenantSlug}/{screenSlug}` (eindeutig pro Screen) +- **Username/Password:** Device-spezifische Credentials (OAuth-ähnlich) +- **QoS-Level:** 1 (At-Least-Once für Critical-Events) +- **Retain:** nein (Event-Natur, nicht State) +- **Heartbeat:** Separat via HTTP (siehe `PLAYER-STATUS-HTTP.md`) + +--- + +## Notizen für Implementierer + +1. **Replay bei Reconnect:** Topics haben `retain: false`, daher entfallen keine Events bei Trennung. Der Agent synchronisiert sich nach Reconnect via regulärem Status-Endpoint. +2. **Ordering:** Mehrere Events zu einem Screen sind ordered; Ordering über Screen-Grenzen hinweg ist nicht garantiert. +3. **Fehlerbehandlung:** Fehlgeschlagene Playlisten-Fetches werden vom Agent nach Standard-Retry-Logik wiederholt. +4. **Version der Spec:** v1.0 (März 2026) diff --git a/player/agent/internal/app/app.go b/player/agent/internal/app/app.go index 69fae5c..3c4286d 100644 --- a/player/agent/internal/app/app.go +++ b/player/agent/internal/app/app.go @@ -13,6 +13,7 @@ import ( "git.az-it.net/az/morz-infoboard/player/agent/internal/config" "git.az-it.net/az/morz-infoboard/player/agent/internal/mqttheartbeat" + "git.az-it.net/az/morz-infoboard/player/agent/internal/mqttsubscriber" "git.az-it.net/az/morz-infoboard/player/agent/internal/playerserver" "git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter" ) @@ -62,6 +63,11 @@ type App struct { // Playlist fetched from the backend (protected by playlistMu). playlistMu sync.RWMutex playlist []playerserver.PlaylistItem + + // mqttFetchC receives a signal whenever a playlist-changed MQTT message + // arrives (after debouncing in the subscriber). pollPlaylist listens on + // this channel to trigger an immediate fetchPlaylist call. + mqttFetchC chan struct{} } type statusSender interface { @@ -109,6 +115,7 @@ func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporte mqttPub: mqttPub, status: StatusStarting, serverConnectivity: ConnectivityUnknown, + mqttFetchC: make(chan struct{}, 1), } } @@ -190,7 +197,28 @@ func (a *App) Run(ctx context.Context) error { // Self-register this screen in the backend (best-effort, non-blocking). go a.registerScreen(ctx) - // Start polling the backend for playlist updates. + // Subscribe to playlist-changed MQTT notifications (optional; fallback = polling). + sub := mqttsubscriber.New( + a.Config.MQTTBroker, + a.Config.ScreenID, + a.Config.MQTTUsername, + a.Config.MQTTPassword, + func() { + // Debounced callback: send a non-blocking signal to the fetch channel. + select { + case a.mqttFetchC <- struct{}{}: + default: // already a pending signal — no need to queue another + } + a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID) + }, + ) + if sub != nil { + a.logger.Printf("event=mqtt_subscriber_enabled broker=%s screen_id=%s topic=%s", + a.Config.MQTTBroker, a.Config.ScreenID, mqttsubscriber.Topic(a.Config.ScreenID)) + defer sub.Close() + } + + // Start polling the backend for playlist updates (60 s fallback + MQTT trigger). go a.pollPlaylist(ctx) a.emitHeartbeat() @@ -266,9 +294,12 @@ func (a *App) registerScreen(ctx context.Context) { } } -// pollPlaylist fetches the active playlist from the backend periodically. +// pollPlaylist fetches the active playlist from the backend. +// It fetches immediately on startup, then waits for either: +// - an MQTT playlist-changed notification (fast path, debounced by subscriber) +// - the 60-second fallback ticker (in case MQTT is unavailable) func (a *App) pollPlaylist(ctx context.Context) { - // Fetch immediately on startup, then every 60s. + // Fetch immediately on startup. a.fetchPlaylist(ctx) ticker := time.NewTicker(60 * time.Second) @@ -277,6 +308,9 @@ func (a *App) pollPlaylist(ctx context.Context) { select { case <-ctx.Done(): return + case <-a.mqttFetchC: + a.logger.Printf("event=playlist_triggered_by_mqtt screen_id=%s", a.Config.ScreenID) + a.fetchPlaylist(ctx) case <-ticker.C: a.fetchPlaylist(ctx) } diff --git a/player/agent/internal/mqttsubscriber/subscriber.go b/player/agent/internal/mqttsubscriber/subscriber.go new file mode 100644 index 0000000..1770533 --- /dev/null +++ b/player/agent/internal/mqttsubscriber/subscriber.go @@ -0,0 +1,117 @@ +// Package mqttsubscriber subscribes to playlist-changed MQTT notifications. +// It is safe for concurrent use and applies client-side debouncing so that +// a burst of messages within a 3-second window triggers at most one callback. +package mqttsubscriber + +import ( + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +const ( + // debounceDuration is the minimum interval between two callback invocations. + // Any MQTT message arriving while the timer is still running resets it. + debounceDuration = 3 * time.Second + + // playlistChangedTopicTemplate is the topic the backend publishes to. + playlistChangedTopic = "signage/screen/%s/playlist-changed" +) + +// PlaylistChangedFunc is called when a debounced playlist-changed notification arrives. +type PlaylistChangedFunc func() + +// Subscriber listens for playlist-changed notifications on MQTT and calls the +// provided callback at most once per debounceDuration. +type Subscriber struct { + client mqtt.Client + timer *time.Timer + onChange PlaylistChangedFunc + + // timerC serializes timer resets through a dedicated goroutine. + resetC chan struct{} + stopC chan struct{} +} + +// Topic returns the MQTT topic for a given screenSlug. +func Topic(screenSlug string) string { + return "signage/screen/" + screenSlug + "/playlist-changed" +} + +// New creates a Subscriber that connects to broker and subscribes to the +// playlist-changed topic for screenSlug. onChange is called (in its own +// goroutine) at most once per debounceDuration. +// +// Returns nil when broker is empty — callers must handle nil. +func New(broker, screenSlug, username, password string, onChange PlaylistChangedFunc) *Subscriber { + if broker == "" { + return nil + } + + s := &Subscriber{ + onChange: onChange, + resetC: make(chan struct{}, 16), + stopC: make(chan struct{}), + } + + topic := Topic(screenSlug) + + opts := mqtt.NewClientOptions(). + AddBroker(broker). + SetClientID("morz-agent-sub-" + screenSlug). + SetCleanSession(true). + SetAutoReconnect(true). + SetConnectRetry(true). + SetConnectRetryInterval(10 * time.Second). + SetOnConnectHandler(func(c mqtt.Client) { + // Re-subscribe after reconnect. + c.Subscribe(topic, 0, func(_ mqtt.Client, _ mqtt.Message) { //nolint:errcheck + select { + case s.resetC <- struct{}{}: + default: // channel full — debounce timer will fire anyway + } + }) + }) + + if username != "" { + opts.SetUsername(username) + opts.SetPassword(password) + } + + s.client = mqtt.NewClient(opts) + s.client.Connect() // non-blocking; paho retries in background + + go s.run() + return s +} + +// run is the debounce loop. It resets a timer on every incoming signal. +// When the timer fires the onChange callback is called once in a goroutine. +func (s *Subscriber) run() { + var timer *time.Timer + for { + select { + case <-s.stopC: + if timer != nil { + timer.Stop() + } + return + case <-s.resetC: + if timer != nil { + timer.Stop() + } + timer = time.AfterFunc(debounceDuration, func() { + go s.onChange() + }) + } + } +} + +// Close disconnects the MQTT client and stops the debounce loop. +func (s *Subscriber) Close() { + if s == nil { + return + } + close(s.stopC) + s.client.Disconnect(250) +} diff --git a/server/backend/go.mod b/server/backend/go.mod index 6da0bfa..dc2d74e 100644 --- a/server/backend/go.mod +++ b/server/backend/go.mod @@ -3,10 +3,13 @@ module git.az-it.net/az/morz-infoboard/server/backend go 1.25.0 require ( + github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgx/v5 v5.9.1 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/net v0.44.0 // indirect golang.org/x/sync v0.17.0 // indirect golang.org/x/text v0.29.0 // indirect ) diff --git a/server/backend/go.sum b/server/backend/go.sum index ca531aa..e7db59d 100644 --- a/server/backend/go.sum +++ b/server/backend/go.sum @@ -1,4 +1,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -11,6 +15,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= diff --git a/server/backend/internal/app/app.go b/server/backend/internal/app/app.go index 82c25b7..40b1dc3 100644 --- a/server/backend/internal/app/app.go +++ b/server/backend/internal/app/app.go @@ -10,12 +10,14 @@ import ( "git.az-it.net/az/morz-infoboard/server/backend/internal/config" "git.az-it.net/az/morz-infoboard/server/backend/internal/db" "git.az-it.net/az/morz-infoboard/server/backend/internal/httpapi" + "git.az-it.net/az/morz-infoboard/server/backend/internal/mqttnotifier" "git.az-it.net/az/morz-infoboard/server/backend/internal/store" ) type App struct { - Config config.Config - server *http.Server + Config config.Config + server *http.Server + notifier *mqttnotifier.Notifier } func New() (*App, error) { @@ -46,26 +48,34 @@ func New() (*App, error) { media := store.NewMediaStore(pool.Pool) playlists := store.NewPlaylistStore(pool.Pool) + // MQTT notifier (no-op when broker not configured). + notifier := mqttnotifier.New(cfg.MQTTBroker, cfg.MQTTUsername, cfg.MQTTPassword) + if cfg.MQTTBroker != "" { + logger.Printf("event=mqtt_notifier_enabled broker=%s", cfg.MQTTBroker) + } else { + logger.Printf("event=mqtt_notifier_disabled reason=no_broker_configured") + } + handler := httpapi.NewRouter(httpapi.RouterDeps{ StatusStore: statusStore, TenantStore: tenants, ScreenStore: screens, MediaStore: media, PlaylistStore: playlists, + Notifier: notifier, UploadDir: cfg.UploadDir, Logger: logger, }) return &App{ - Config: cfg, - server: &http.Server{ - Addr: cfg.HTTPAddress, - Handler: handler, - }, + Config: cfg, + server: &http.Server{Addr: cfg.HTTPAddress, Handler: handler}, + notifier: notifier, }, nil } func (a *App) Run() error { + defer a.notifier.Close() err := a.server.ListenAndServe() if errors.Is(err, http.ErrServerClosed) { return nil diff --git a/server/backend/internal/config/config.go b/server/backend/internal/config/config.go index e838778..35ac6de 100644 --- a/server/backend/internal/config/config.go +++ b/server/backend/internal/config/config.go @@ -7,6 +7,10 @@ type Config struct { StatusStorePath string DatabaseURL string UploadDir string + // MQTT — optional. When MQTTBroker is empty, notifications are disabled. + MQTTBroker string + MQTTUsername string + MQTTPassword string } func Load() Config { @@ -15,6 +19,9 @@ func Load() Config { StatusStorePath: os.Getenv("MORZ_INFOBOARD_STATUS_STORE_PATH"), DatabaseURL: getenv("MORZ_INFOBOARD_DATABASE_URL", "postgres://morz_infoboard:morz_infoboard@localhost:5432/morz_infoboard?sslmode=disable"), UploadDir: getenv("MORZ_INFOBOARD_UPLOAD_DIR", "/tmp/morz-uploads"), + MQTTBroker: os.Getenv("MORZ_INFOBOARD_MQTT_BROKER"), + MQTTUsername: os.Getenv("MORZ_INFOBOARD_MQTT_USERNAME"), + MQTTPassword: os.Getenv("MORZ_INFOBOARD_MQTT_PASSWORD"), } } diff --git a/server/backend/internal/httpapi/manage/playlist.go b/server/backend/internal/httpapi/manage/playlist.go index 3cd6db0..9ab3116 100644 --- a/server/backend/internal/httpapi/manage/playlist.go +++ b/server/backend/internal/httpapi/manage/playlist.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "git.az-it.net/az/morz-infoboard/server/backend/internal/mqttnotifier" "git.az-it.net/az/morz-infoboard/server/backend/internal/store" ) @@ -43,7 +44,7 @@ func HandleGetPlaylist(screens *store.ScreenStore, playlists *store.PlaylistStor } // HandleAddItem adds a playlist item (from existing media asset or direct URL). -func HandleAddItem(playlists *store.PlaylistStore, media *store.MediaStore) http.HandlerFunc { +func HandleAddItem(playlists *store.PlaylistStore, media *store.MediaStore, notifier *mqttnotifier.Notifier) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { playlistID := r.PathValue("playlistId") @@ -98,6 +99,10 @@ func HandleAddItem(playlists *store.PlaylistStore, media *store.MediaStore) http return } + if slug, err := playlists.ScreenSlugByPlaylistID(r.Context(), playlistID); err == nil { + notifier.NotifyChanged(slug) + } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(item) //nolint:errcheck @@ -105,7 +110,7 @@ func HandleAddItem(playlists *store.PlaylistStore, media *store.MediaStore) http } // HandleUpdateItem updates duration, title, enabled, valid_from, valid_until. -func HandleUpdateItem(playlists *store.PlaylistStore) http.HandlerFunc { +func HandleUpdateItem(playlists *store.PlaylistStore, notifier *mqttnotifier.Notifier) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { id := r.PathValue("itemId") @@ -136,24 +141,38 @@ func HandleUpdateItem(playlists *store.PlaylistStore) http.HandlerFunc { http.Error(w, "db error", http.StatusInternalServerError) return } + + if slug, err := playlists.ScreenSlugByItemID(r.Context(), id); err == nil { + notifier.NotifyChanged(slug) + } + w.WriteHeader(http.StatusNoContent) } } // HandleDeleteItem removes a playlist item. -func HandleDeleteItem(playlists *store.PlaylistStore) http.HandlerFunc { +func HandleDeleteItem(playlists *store.PlaylistStore, notifier *mqttnotifier.Notifier) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { id := r.PathValue("itemId") + + // Resolve slug before delete (item won't exist after). + slug, _ := playlists.ScreenSlugByItemID(r.Context(), id) + if err := playlists.DeleteItem(r.Context(), id); err != nil { http.Error(w, "db error", http.StatusInternalServerError) return } + + if slug != "" { + notifier.NotifyChanged(slug) + } + w.WriteHeader(http.StatusNoContent) } } // HandleReorder accepts an ordered list of item IDs and updates order_index. -func HandleReorder(playlists *store.PlaylistStore) http.HandlerFunc { +func HandleReorder(playlists *store.PlaylistStore, notifier *mqttnotifier.Notifier) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { playlistID := r.PathValue("playlistId") @@ -167,6 +186,11 @@ func HandleReorder(playlists *store.PlaylistStore) http.HandlerFunc { http.Error(w, "db error", http.StatusInternalServerError) return } + + if slug, err := playlists.ScreenSlugByPlaylistID(r.Context(), playlistID); err == nil { + notifier.NotifyChanged(slug) + } + w.WriteHeader(http.StatusNoContent) } } diff --git a/server/backend/internal/httpapi/manage/ui.go b/server/backend/internal/httpapi/manage/ui.go index c8e8daf..19d37bf 100644 --- a/server/backend/internal/httpapi/manage/ui.go +++ b/server/backend/internal/httpapi/manage/ui.go @@ -12,6 +12,7 @@ import ( "strings" "time" + "git.az-it.net/az/morz-infoboard/server/backend/internal/mqttnotifier" "git.az-it.net/az/morz-infoboard/server/backend/internal/store" ) @@ -294,7 +295,7 @@ func HandleUploadMediaUI(media *store.MediaStore, screens *store.ScreenStore, up } // HandleAddItemUI handles form POST to add a playlist item, then redirects. -func HandleAddItemUI(playlists *store.PlaylistStore, media *store.MediaStore, screens *store.ScreenStore) http.HandlerFunc { +func HandleAddItemUI(playlists *store.PlaylistStore, media *store.MediaStore, screens *store.ScreenStore, notifier *mqttnotifier.Notifier) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { screenSlug := r.PathValue("screenSlug") if err := r.ParseForm(); err != nil { @@ -353,12 +354,13 @@ func HandleAddItemUI(playlists *store.PlaylistStore, media *store.MediaStore, sc http.Error(w, "db error", http.StatusInternalServerError) return } + notifier.NotifyChanged(screenSlug) http.Redirect(w, r, "/manage/"+screenSlug+"?msg=added", http.StatusSeeOther) } } // HandleDeleteItemUI removes a playlist item and redirects back. -func HandleDeleteItemUI(playlists *store.PlaylistStore) http.HandlerFunc { +func HandleDeleteItemUI(playlists *store.PlaylistStore, notifier *mqttnotifier.Notifier) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { screenSlug := r.PathValue("screenSlug") itemID := r.PathValue("itemId") @@ -366,12 +368,13 @@ func HandleDeleteItemUI(playlists *store.PlaylistStore) http.HandlerFunc { http.Error(w, "db error", http.StatusInternalServerError) return } + notifier.NotifyChanged(screenSlug) http.Redirect(w, r, "/manage/"+screenSlug+"?msg=deleted", http.StatusSeeOther) } } // HandleReorderUI accepts JSON body with ordered IDs (HTMX/fetch). -func HandleReorderUI(playlists *store.PlaylistStore, screens *store.ScreenStore) http.HandlerFunc { +func HandleReorderUI(playlists *store.PlaylistStore, screens *store.ScreenStore, notifier *mqttnotifier.Notifier) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { screenSlug := r.PathValue("screenSlug") screen, err := screens.GetBySlug(r.Context(), screenSlug) @@ -393,12 +396,13 @@ func HandleReorderUI(playlists *store.PlaylistStore, screens *store.ScreenStore) http.Error(w, "db error", http.StatusInternalServerError) return } + notifier.NotifyChanged(screenSlug) w.WriteHeader(http.StatusNoContent) } } // HandleUpdateItemUI handles form PATCH/POST to update a single item. -func HandleUpdateItemUI(playlists *store.PlaylistStore) http.HandlerFunc { +func HandleUpdateItemUI(playlists *store.PlaylistStore, notifier *mqttnotifier.Notifier) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { screenSlug := r.PathValue("screenSlug") itemID := r.PathValue("itemId") @@ -419,12 +423,13 @@ func HandleUpdateItemUI(playlists *store.PlaylistStore) http.HandlerFunc { http.Error(w, "db error", http.StatusInternalServerError) return } + notifier.NotifyChanged(screenSlug) http.Redirect(w, r, "/manage/"+screenSlug+"?msg=saved", http.StatusSeeOther) } } // HandleDeleteMediaUI deletes media and redirects back. -func HandleDeleteMediaUI(media *store.MediaStore, screens *store.ScreenStore, uploadDir string) http.HandlerFunc { +func HandleDeleteMediaUI(media *store.MediaStore, screens *store.ScreenStore, uploadDir string, notifier *mqttnotifier.Notifier) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { screenSlug := r.PathValue("screenSlug") mediaID := r.PathValue("mediaId") @@ -435,6 +440,7 @@ func HandleDeleteMediaUI(media *store.MediaStore, screens *store.ScreenStore, up } media.Delete(r.Context(), mediaID) //nolint:errcheck + notifier.NotifyChanged(screenSlug) http.Redirect(w, r, "/manage/"+screenSlug+"?msg=deleted", http.StatusSeeOther) } } diff --git a/server/backend/internal/httpapi/router.go b/server/backend/internal/httpapi/router.go index faf618b..ffe3438 100644 --- a/server/backend/internal/httpapi/router.go +++ b/server/backend/internal/httpapi/router.go @@ -5,6 +5,7 @@ import ( "net/http" "git.az-it.net/az/morz-infoboard/server/backend/internal/httpapi/manage" + "git.az-it.net/az/morz-infoboard/server/backend/internal/mqttnotifier" "git.az-it.net/az/morz-infoboard/server/backend/internal/store" ) @@ -15,6 +16,7 @@ type RouterDeps struct { ScreenStore *store.ScreenStore MediaStore *store.MediaStore PlaylistStore *store.PlaylistStore + Notifier *mqttnotifier.Notifier UploadDir string Logger *log.Logger } @@ -73,6 +75,12 @@ func registerManageRoutes(mux *http.ServeMux, d RouterDeps) { uploadDir = "/tmp/morz-uploads" } + // Ensure notifier is never nil inside handlers (no-op when broker not configured). + notifier := d.Notifier + if notifier == nil { + notifier = mqttnotifier.New("", "", "") + } + // Serve uploaded files. mux.Handle("GET /uploads/", http.StripPrefix("/uploads/", http.FileServer(http.Dir(uploadDir)))) @@ -92,15 +100,15 @@ func registerManageRoutes(mux *http.ServeMux, d RouterDeps) { mux.HandleFunc("POST /manage/{screenSlug}/upload", manage.HandleUploadMediaUI(d.MediaStore, d.ScreenStore, uploadDir)) mux.HandleFunc("POST /manage/{screenSlug}/items", - manage.HandleAddItemUI(d.PlaylistStore, d.MediaStore, d.ScreenStore)) + manage.HandleAddItemUI(d.PlaylistStore, d.MediaStore, d.ScreenStore, notifier)) mux.HandleFunc("POST /manage/{screenSlug}/items/{itemId}", - manage.HandleUpdateItemUI(d.PlaylistStore)) + manage.HandleUpdateItemUI(d.PlaylistStore, notifier)) mux.HandleFunc("POST /manage/{screenSlug}/items/{itemId}/delete", - manage.HandleDeleteItemUI(d.PlaylistStore)) + manage.HandleDeleteItemUI(d.PlaylistStore, notifier)) mux.HandleFunc("POST /manage/{screenSlug}/reorder", - manage.HandleReorderUI(d.PlaylistStore, d.ScreenStore)) + manage.HandleReorderUI(d.PlaylistStore, d.ScreenStore, notifier)) mux.HandleFunc("POST /manage/{screenSlug}/media/{mediaId}/delete", - manage.HandleDeleteMediaUI(d.MediaStore, d.ScreenStore, uploadDir)) + manage.HandleDeleteMediaUI(d.MediaStore, d.ScreenStore, uploadDir, notifier)) // ── JSON API — screens ──────────────────────────────────────────────── // Self-registration: called by agent on startup (must be before /{tenantSlug}/ routes) @@ -125,13 +133,13 @@ func registerManageRoutes(mux *http.ServeMux, d RouterDeps) { mux.HandleFunc("GET /api/v1/playlists/{screenId}", manage.HandleGetPlaylist(d.ScreenStore, d.PlaylistStore)) mux.HandleFunc("POST /api/v1/playlists/{playlistId}/items", - manage.HandleAddItem(d.PlaylistStore, d.MediaStore)) + manage.HandleAddItem(d.PlaylistStore, d.MediaStore, notifier)) mux.HandleFunc("PATCH /api/v1/items/{itemId}", - manage.HandleUpdateItem(d.PlaylistStore)) + manage.HandleUpdateItem(d.PlaylistStore, notifier)) mux.HandleFunc("DELETE /api/v1/items/{itemId}", - manage.HandleDeleteItem(d.PlaylistStore)) + manage.HandleDeleteItem(d.PlaylistStore, notifier)) mux.HandleFunc("PUT /api/v1/playlists/{playlistId}/order", - manage.HandleReorder(d.PlaylistStore)) + manage.HandleReorder(d.PlaylistStore, notifier)) mux.HandleFunc("PATCH /api/v1/playlists/{playlistId}/duration", manage.HandleUpdatePlaylistDuration(d.PlaylistStore)) } diff --git a/server/backend/internal/mqttnotifier/notifier.go b/server/backend/internal/mqttnotifier/notifier.go new file mode 100644 index 0000000..ba45d86 --- /dev/null +++ b/server/backend/internal/mqttnotifier/notifier.go @@ -0,0 +1,106 @@ +// Package mqttnotifier publishes playlist-changed notifications to MQTT. +// It is safe for concurrent use and applies per-screen debouncing so that +// rapid edits within a 2-second window produce at most one MQTT message. +package mqttnotifier + +import ( + "fmt" + "sync" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +const ( + // debounceDuration is the minimum time between two publish calls for the + // same screen. Any change arriving within this window resets the timer. + debounceDuration = 2 * time.Second +) + +// Notifier publishes "playlist-changed" MQTT messages with per-screen debounce. +// If no broker URL is configured it behaves as a no-op (all methods are safe +// to call and do nothing). +type Notifier struct { + client mqtt.Client // nil when disabled + + mu sync.Mutex + timers map[string]*time.Timer // keyed by screenSlug +} + +// New creates a Notifier connected to broker (e.g. "tcp://mosquitto:1883"). +// username/password may be empty. Returns a no-op Notifier when broker == "". +func New(broker, username, password string) *Notifier { + n := &Notifier{timers: make(map[string]*time.Timer)} + if broker == "" { + return n + } + + opts := mqtt.NewClientOptions(). + AddBroker(broker). + SetClientID("morz-backend"). + SetCleanSession(true). + SetAutoReconnect(true). + SetConnectRetry(true). + SetConnectRetryInterval(10 * time.Second) + + if username != "" { + opts.SetUsername(username) + opts.SetPassword(password) + } + + n.client = mqtt.NewClient(opts) + n.client.Connect() // non-blocking; paho retries in background + return n +} + +// Topic returns the MQTT topic for a screen's playlist-changed notification. +func Topic(screenSlug string) string { + return fmt.Sprintf("signage/screen/%s/playlist-changed", screenSlug) +} + +// NotifyChanged schedules a publish for screenSlug. If another call for the +// same screen arrives within debounceDuration, the timer is reset (debounce). +// The method returns immediately; the actual publish happens in a goroutine. +func (n *Notifier) NotifyChanged(screenSlug string) { + if n.client == nil { + return + } + + n.mu.Lock() + defer n.mu.Unlock() + + // Reset existing timer if present (debounce). + if t, ok := n.timers[screenSlug]; ok { + t.Stop() + } + + n.timers[screenSlug] = time.AfterFunc(debounceDuration, func() { + n.mu.Lock() + delete(n.timers, screenSlug) + n.mu.Unlock() + + n.publish(screenSlug) + }) +} + +func (n *Notifier) publish(screenSlug string) { + topic := Topic(screenSlug) + payload := []byte(fmt.Sprintf(`{"ts":%d}`, time.Now().UnixMilli())) + token := n.client.Publish(topic, 0, false, payload) + token.WaitTimeout(3 * time.Second) + // Errors are silently dropped — the 60 s polling in the agent is the fallback. +} + +// Close disconnects the MQTT client gracefully. +func (n *Notifier) Close() { + if n.client == nil { + return + } + n.mu.Lock() + for _, t := range n.timers { + t.Stop() + } + n.timers = make(map[string]*time.Timer) + n.mu.Unlock() + n.client.Disconnect(250) +} diff --git a/server/backend/internal/store/store.go b/server/backend/internal/store/store.go index 9d82931..ca3b030 100644 --- a/server/backend/internal/store/store.go +++ b/server/backend/internal/store/store.go @@ -407,6 +407,29 @@ func (s *PlaylistStore) DeleteItem(ctx context.Context, id string) error { return err } +// ScreenSlugByPlaylistID returns the slug of the screen that owns playlistID. +func (s *PlaylistStore) ScreenSlugByPlaylistID(ctx context.Context, playlistID string) (string, error) { + var slug string + err := s.pool.QueryRow(ctx, + `select sc.slug + from playlists pl + join screens sc on sc.id = pl.screen_id + where pl.id = $1`, playlistID).Scan(&slug) + return slug, err +} + +// ScreenSlugByItemID returns the slug of the screen that owns itemID. +func (s *PlaylistStore) ScreenSlugByItemID(ctx context.Context, itemID string) (string, error) { + var slug string + err := s.pool.QueryRow(ctx, + `select sc.slug + from playlist_items pi + join playlists pl on pl.id = pi.playlist_id + join screens sc on sc.id = pl.screen_id + where pi.id = $1`, itemID).Scan(&slug) + return slug, err +} + // Reorder sets order_index for each item ID in the given slice order. func (s *PlaylistStore) Reorder(ctx context.Context, playlistID string, itemIDs []string) error { tx, err := s.pool.Begin(ctx)