MQTT-Playlist-Push: Änderungen erreichen Client binnen 5 Sekunden

Backend published auf signage/screen/{slug}/playlist-changed nach
Playlist-Mutationen (2s Debounce). Agent subscribed und fetcht
Playlist sofort (3s Debounce). 60s-Polling bleibt als Fallback.

Neue Packages: mqttnotifier (Backend), mqttsubscriber (Agent)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jesko Anschütz 2026-03-23 11:35:50 +01:00
parent d4ab1da5aa
commit 585cb83ed0
14 changed files with 513 additions and 28 deletions

View file

@ -78,6 +78,7 @@
- [x] Fallback-Verzeichnisbetrieb demonstrieren - [x] Fallback-Verzeichnisbetrieb demonstrieren
- [ ] `valid_from`/`valid_until` im Prototyp pruefen - [ ] `valid_from`/`valid_until` im Prototyp pruefen
- [x] Offline-Sync mit lokalem Cache pruefen - [x] Offline-Sync mit lokalem Cache pruefen
- [x] MQTT-Topic `signage/screen/{screenSlug}/playlist-changed` spezifiziert und dokumentiert
- [ ] MQTT-Kommandos `reload`, `restart_player`, `reboot`, `display_on`, `display_off` testweise durchspielen - [ ] MQTT-Kommandos `reload`, `restart_player`, `reboot`, `display_on`, `display_off` testweise durchspielen
- [ ] globale Kampagne testen, die tenantbezogenen Content temporär ueberschreibt - [ ] globale Kampagne testen, die tenantbezogenen Content temporär ueberschreibt
- [ ] Rueckfall auf Normalbetrieb nach manueller Deaktivierung pruefen - [ ] Rueckfall auf Normalbetrieb nach manueller Deaktivierung pruefen
@ -89,6 +90,7 @@
- [x] Chromium-Kiosk-Startskript erstellen - [x] Chromium-Kiosk-Startskript erstellen
- [ ] Screenshot-Erzeugung auf dem Player integrieren - [ ] Screenshot-Erzeugung auf dem Player integrieren
- [x] Heartbeat- und Statusmeldungen integrieren - [x] Heartbeat- und Statusmeldungen integrieren
- [x] MQTT-Playlist-Change-Synchronisation mit Backend-Debounce (2s) und Agent-Debounce (3s) implementiert
- [ ] Fehler- und Wiederanlaufverhalten verifizieren - [ ] Fehler- und Wiederanlaufverhalten verifizieren
## Phase 7 - Ansible-Automatisierung ## Phase 7 - Ansible-Automatisierung

View file

@ -32,11 +32,14 @@ services:
MORZ_INFOBOARD_HTTP_ADDR: ":8080" MORZ_INFOBOARD_HTTP_ADDR: ":8080"
MORZ_INFOBOARD_DATABASE_URL: "postgres://morz_infoboard:morz_infoboard@postgres:5432/morz_infoboard?sslmode=disable" MORZ_INFOBOARD_DATABASE_URL: "postgres://morz_infoboard:morz_infoboard@postgres:5432/morz_infoboard?sslmode=disable"
MORZ_INFOBOARD_UPLOAD_DIR: "/uploads" MORZ_INFOBOARD_UPLOAD_DIR: "/uploads"
MORZ_INFOBOARD_MQTT_BROKER: "tcp://mosquitto:1883"
volumes: volumes:
- uploads:/uploads - uploads:/uploads
depends_on: depends_on:
postgres: postgres:
condition: service_healthy condition: service_healthy
mosquitto:
condition: service_started
restart: unless-stopped restart: unless-stopped
volumes: volumes:

136
docs/API-MQTT-VERTRAG.md Normal file
View file

@ -0,0 +1,136 @@
# Info-Board Neu - MQTT-Vertrag
Vertrag zwischen Backend und Agent für die Echtzeit-Synchronisation von Playlist-Änderungen und Gerätebefehlen.
---
## Überblick
Das Messaging-System nutzt MQTT für:
- **Playlist-Mutations-Benachrichtigungen**: Backend → Agent
- **Device-Commands**: Backend → Agent (zukünftig: reload, restart_player, reboot, display_on/off)
- **Heartbeat & Status**: Agent → Backend (siehe `PLAYER-STATUS-HTTP.md`)
Alle Topics folgen dem Naming-Pattern: `signage/{component}/{screenSlug}/{event}`
---
## Topics
### Backend publishes
#### `signage/screen/{screenSlug}/playlist-changed`
**Publisher:** Backend
**Subscriber:** Agent
Wird nach jeder Mutation der Playlist gepublished (Add, Remove, Reorder, Enable/Disable Item).
**Payload:**
```json
{
"ts": 1711268440000
}
```
- `ts`: Unix-Zeitstempel in Millisekunden des Änderungsereignisses auf dem Backend
**Verhalten:**
- Backend debounced Änderungen über **2 Sekunden**
- Mehrere schnelle Mutationen werden zu einem Event zusammengefasst
- Garantiert mindestens ein Event pro logischer Änderung
**Agent-Reaktion:**
- Agent empfängt das Event
- Agent debounced die Verarbeitung über **3 Sekunden**
- Agent startet sofortiges Playlist-Fetch via HTTP `GET /api/v1/screens/{screenSlug}/playlist`
- Agent speichert die Playlist lokal und signalisiert dem Browser einen Reload
**Implementierung (Agent):**
```go
// Pseudocode
func OnPlaylistChanged(msg PlaylistChangedMessage) {
if debounceTimer.running {
debounceTimer.reset()
} else {
debounceTimer.start(3 * time.Second)
}
}
func onDebounceExpire() {
playlist := fetchPlaylistViaHTTP()
saveToLocalCache(playlist)
signalBrowserReload()
}
```
---
## Zukünftige Topics
Die folgenden Topics sind **geplant** für Phase 5 (Prototyping) und später:
### `signage/screen/{screenSlug}/device-command`
**Publisher:** Backend
**Subscriber:** Agent
Befehl-Queue für Device-Steuerung.
**Payload:**
```json
{
"cmd_id": "uuid",
"command": "reload|restart_player|reboot|display_on|display_off",
"ts": 1711268440000
}
```
**Agent-Reaktion:**
- Befehl ausführen
- ACK via HTTP POST zu `PUT /api/v1/screens/{screenSlug}/command-ack`
---
## Beispiel-Flow: Playlist-Update
```
Admin: Click "Speichern" in Playlist-UI
Backend: Playlist-Mutation in DB schreiben
Backend: `playlist-changed` mit ts=now nach 2s Debounce publifyen
Agent: Event empfangen, 3s Debounce starten
Agent: Nach 3s → HTTP GET /api/v1/screens/{slug}/playlist
Backend: Aktuelle Playlist zurückgeben
Agent: Lokal speichern, Browser signalisieren "reload"
Browser: Neuer Content geladen und abgespielt
```
---
## MQTT-Verbindungspezifikation
(Siehe `PLAYER-KONZEPT.md` und Provisioning-Variablen für Broker-URL, Authentifizierung und Retry-Logik)
- **Broker-Adresse:** Über Provisioning konfigurierbar (Standard: `tcp://backend:1883`)
- **Client-ID:** `{tenantSlug}/{screenSlug}` (eindeutig pro Screen)
- **Username/Password:** Device-spezifische Credentials (OAuth-ähnlich)
- **QoS-Level:** 1 (At-Least-Once für Critical-Events)
- **Retain:** nein (Event-Natur, nicht State)
- **Heartbeat:** Separat via HTTP (siehe `PLAYER-STATUS-HTTP.md`)
---
## Notizen für Implementierer
1. **Replay bei Reconnect:** Topics haben `retain: false`, daher entfallen keine Events bei Trennung. Der Agent synchronisiert sich nach Reconnect via regulärem Status-Endpoint.
2. **Ordering:** Mehrere Events zu einem Screen sind ordered; Ordering über Screen-Grenzen hinweg ist nicht garantiert.
3. **Fehlerbehandlung:** Fehlgeschlagene Playlisten-Fetches werden vom Agent nach Standard-Retry-Logik wiederholt.
4. **Version der Spec:** v1.0 (März 2026)

View file

@ -13,6 +13,7 @@ import (
"git.az-it.net/az/morz-infoboard/player/agent/internal/config" "git.az-it.net/az/morz-infoboard/player/agent/internal/config"
"git.az-it.net/az/morz-infoboard/player/agent/internal/mqttheartbeat" "git.az-it.net/az/morz-infoboard/player/agent/internal/mqttheartbeat"
"git.az-it.net/az/morz-infoboard/player/agent/internal/mqttsubscriber"
"git.az-it.net/az/morz-infoboard/player/agent/internal/playerserver" "git.az-it.net/az/morz-infoboard/player/agent/internal/playerserver"
"git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter" "git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter"
) )
@ -62,6 +63,11 @@ type App struct {
// Playlist fetched from the backend (protected by playlistMu). // Playlist fetched from the backend (protected by playlistMu).
playlistMu sync.RWMutex playlistMu sync.RWMutex
playlist []playerserver.PlaylistItem playlist []playerserver.PlaylistItem
// mqttFetchC receives a signal whenever a playlist-changed MQTT message
// arrives (after debouncing in the subscriber). pollPlaylist listens on
// this channel to trigger an immediate fetchPlaylist call.
mqttFetchC chan struct{}
} }
type statusSender interface { type statusSender interface {
@ -109,6 +115,7 @@ func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporte
mqttPub: mqttPub, mqttPub: mqttPub,
status: StatusStarting, status: StatusStarting,
serverConnectivity: ConnectivityUnknown, serverConnectivity: ConnectivityUnknown,
mqttFetchC: make(chan struct{}, 1),
} }
} }
@ -190,7 +197,28 @@ func (a *App) Run(ctx context.Context) error {
// Self-register this screen in the backend (best-effort, non-blocking). // Self-register this screen in the backend (best-effort, non-blocking).
go a.registerScreen(ctx) go a.registerScreen(ctx)
// Start polling the backend for playlist updates. // Subscribe to playlist-changed MQTT notifications (optional; fallback = polling).
sub := mqttsubscriber.New(
a.Config.MQTTBroker,
a.Config.ScreenID,
a.Config.MQTTUsername,
a.Config.MQTTPassword,
func() {
// Debounced callback: send a non-blocking signal to the fetch channel.
select {
case a.mqttFetchC <- struct{}{}:
default: // already a pending signal — no need to queue another
}
a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID)
},
)
if sub != nil {
a.logger.Printf("event=mqtt_subscriber_enabled broker=%s screen_id=%s topic=%s",
a.Config.MQTTBroker, a.Config.ScreenID, mqttsubscriber.Topic(a.Config.ScreenID))
defer sub.Close()
}
// Start polling the backend for playlist updates (60 s fallback + MQTT trigger).
go a.pollPlaylist(ctx) go a.pollPlaylist(ctx)
a.emitHeartbeat() a.emitHeartbeat()
@ -266,9 +294,12 @@ func (a *App) registerScreen(ctx context.Context) {
} }
} }
// pollPlaylist fetches the active playlist from the backend periodically. // pollPlaylist fetches the active playlist from the backend.
// It fetches immediately on startup, then waits for either:
// - an MQTT playlist-changed notification (fast path, debounced by subscriber)
// - the 60-second fallback ticker (in case MQTT is unavailable)
func (a *App) pollPlaylist(ctx context.Context) { func (a *App) pollPlaylist(ctx context.Context) {
// Fetch immediately on startup, then every 60s. // Fetch immediately on startup.
a.fetchPlaylist(ctx) a.fetchPlaylist(ctx)
ticker := time.NewTicker(60 * time.Second) ticker := time.NewTicker(60 * time.Second)
@ -277,6 +308,9 @@ func (a *App) pollPlaylist(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-a.mqttFetchC:
a.logger.Printf("event=playlist_triggered_by_mqtt screen_id=%s", a.Config.ScreenID)
a.fetchPlaylist(ctx)
case <-ticker.C: case <-ticker.C:
a.fetchPlaylist(ctx) a.fetchPlaylist(ctx)
} }

View file

@ -0,0 +1,117 @@
// Package mqttsubscriber subscribes to playlist-changed MQTT notifications.
// It is safe for concurrent use and applies client-side debouncing so that
// a burst of messages within a 3-second window triggers at most one callback.
package mqttsubscriber
import (
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const (
// debounceDuration is the minimum interval between two callback invocations.
// Any MQTT message arriving while the timer is still running resets it.
debounceDuration = 3 * time.Second
// playlistChangedTopicTemplate is the topic the backend publishes to.
playlistChangedTopic = "signage/screen/%s/playlist-changed"
)
// PlaylistChangedFunc is called when a debounced playlist-changed notification arrives.
type PlaylistChangedFunc func()
// Subscriber listens for playlist-changed notifications on MQTT and calls the
// provided callback at most once per debounceDuration.
type Subscriber struct {
client mqtt.Client
timer *time.Timer
onChange PlaylistChangedFunc
// timerC serializes timer resets through a dedicated goroutine.
resetC chan struct{}
stopC chan struct{}
}
// Topic returns the MQTT topic for a given screenSlug.
func Topic(screenSlug string) string {
return "signage/screen/" + screenSlug + "/playlist-changed"
}
// New creates a Subscriber that connects to broker and subscribes to the
// playlist-changed topic for screenSlug. onChange is called (in its own
// goroutine) at most once per debounceDuration.
//
// Returns nil when broker is empty — callers must handle nil.
func New(broker, screenSlug, username, password string, onChange PlaylistChangedFunc) *Subscriber {
if broker == "" {
return nil
}
s := &Subscriber{
onChange: onChange,
resetC: make(chan struct{}, 16),
stopC: make(chan struct{}),
}
topic := Topic(screenSlug)
opts := mqtt.NewClientOptions().
AddBroker(broker).
SetClientID("morz-agent-sub-" + screenSlug).
SetCleanSession(true).
SetAutoReconnect(true).
SetConnectRetry(true).
SetConnectRetryInterval(10 * time.Second).
SetOnConnectHandler(func(c mqtt.Client) {
// Re-subscribe after reconnect.
c.Subscribe(topic, 0, func(_ mqtt.Client, _ mqtt.Message) { //nolint:errcheck
select {
case s.resetC <- struct{}{}:
default: // channel full — debounce timer will fire anyway
}
})
})
if username != "" {
opts.SetUsername(username)
opts.SetPassword(password)
}
s.client = mqtt.NewClient(opts)
s.client.Connect() // non-blocking; paho retries in background
go s.run()
return s
}
// run is the debounce loop. It resets a timer on every incoming signal.
// When the timer fires the onChange callback is called once in a goroutine.
func (s *Subscriber) run() {
var timer *time.Timer
for {
select {
case <-s.stopC:
if timer != nil {
timer.Stop()
}
return
case <-s.resetC:
if timer != nil {
timer.Stop()
}
timer = time.AfterFunc(debounceDuration, func() {
go s.onChange()
})
}
}
}
// Close disconnects the MQTT client and stops the debounce loop.
func (s *Subscriber) Close() {
if s == nil {
return
}
close(s.stopC)
s.client.Disconnect(250)
}

View file

@ -3,10 +3,13 @@ module git.az-it.net/az/morz-infoboard/server/backend
go 1.25.0 go 1.25.0
require ( require (
github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.9.1 // indirect github.com/jackc/pgx/v5 v5.9.1 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect
golang.org/x/net v0.44.0 // indirect
golang.org/x/sync v0.17.0 // indirect golang.org/x/sync v0.17.0 // indirect
golang.org/x/text v0.29.0 // indirect golang.org/x/text v0.29.0 // indirect
) )

View file

@ -1,4 +1,8 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE=
github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@ -11,6 +15,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=

View file

@ -10,12 +10,14 @@ import (
"git.az-it.net/az/morz-infoboard/server/backend/internal/config" "git.az-it.net/az/morz-infoboard/server/backend/internal/config"
"git.az-it.net/az/morz-infoboard/server/backend/internal/db" "git.az-it.net/az/morz-infoboard/server/backend/internal/db"
"git.az-it.net/az/morz-infoboard/server/backend/internal/httpapi" "git.az-it.net/az/morz-infoboard/server/backend/internal/httpapi"
"git.az-it.net/az/morz-infoboard/server/backend/internal/mqttnotifier"
"git.az-it.net/az/morz-infoboard/server/backend/internal/store" "git.az-it.net/az/morz-infoboard/server/backend/internal/store"
) )
type App struct { type App struct {
Config config.Config Config config.Config
server *http.Server server *http.Server
notifier *mqttnotifier.Notifier
} }
func New() (*App, error) { func New() (*App, error) {
@ -46,26 +48,34 @@ func New() (*App, error) {
media := store.NewMediaStore(pool.Pool) media := store.NewMediaStore(pool.Pool)
playlists := store.NewPlaylistStore(pool.Pool) playlists := store.NewPlaylistStore(pool.Pool)
// MQTT notifier (no-op when broker not configured).
notifier := mqttnotifier.New(cfg.MQTTBroker, cfg.MQTTUsername, cfg.MQTTPassword)
if cfg.MQTTBroker != "" {
logger.Printf("event=mqtt_notifier_enabled broker=%s", cfg.MQTTBroker)
} else {
logger.Printf("event=mqtt_notifier_disabled reason=no_broker_configured")
}
handler := httpapi.NewRouter(httpapi.RouterDeps{ handler := httpapi.NewRouter(httpapi.RouterDeps{
StatusStore: statusStore, StatusStore: statusStore,
TenantStore: tenants, TenantStore: tenants,
ScreenStore: screens, ScreenStore: screens,
MediaStore: media, MediaStore: media,
PlaylistStore: playlists, PlaylistStore: playlists,
Notifier: notifier,
UploadDir: cfg.UploadDir, UploadDir: cfg.UploadDir,
Logger: logger, Logger: logger,
}) })
return &App{ return &App{
Config: cfg, Config: cfg,
server: &http.Server{ server: &http.Server{Addr: cfg.HTTPAddress, Handler: handler},
Addr: cfg.HTTPAddress, notifier: notifier,
Handler: handler,
},
}, nil }, nil
} }
func (a *App) Run() error { func (a *App) Run() error {
defer a.notifier.Close()
err := a.server.ListenAndServe() err := a.server.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) { if errors.Is(err, http.ErrServerClosed) {
return nil return nil

View file

@ -7,6 +7,10 @@ type Config struct {
StatusStorePath string StatusStorePath string
DatabaseURL string DatabaseURL string
UploadDir string UploadDir string
// MQTT — optional. When MQTTBroker is empty, notifications are disabled.
MQTTBroker string
MQTTUsername string
MQTTPassword string
} }
func Load() Config { func Load() Config {
@ -15,6 +19,9 @@ func Load() Config {
StatusStorePath: os.Getenv("MORZ_INFOBOARD_STATUS_STORE_PATH"), StatusStorePath: os.Getenv("MORZ_INFOBOARD_STATUS_STORE_PATH"),
DatabaseURL: getenv("MORZ_INFOBOARD_DATABASE_URL", "postgres://morz_infoboard:morz_infoboard@localhost:5432/morz_infoboard?sslmode=disable"), DatabaseURL: getenv("MORZ_INFOBOARD_DATABASE_URL", "postgres://morz_infoboard:morz_infoboard@localhost:5432/morz_infoboard?sslmode=disable"),
UploadDir: getenv("MORZ_INFOBOARD_UPLOAD_DIR", "/tmp/morz-uploads"), UploadDir: getenv("MORZ_INFOBOARD_UPLOAD_DIR", "/tmp/morz-uploads"),
MQTTBroker: os.Getenv("MORZ_INFOBOARD_MQTT_BROKER"),
MQTTUsername: os.Getenv("MORZ_INFOBOARD_MQTT_USERNAME"),
MQTTPassword: os.Getenv("MORZ_INFOBOARD_MQTT_PASSWORD"),
} }
} }

View file

@ -8,6 +8,7 @@ import (
"strings" "strings"
"time" "time"
"git.az-it.net/az/morz-infoboard/server/backend/internal/mqttnotifier"
"git.az-it.net/az/morz-infoboard/server/backend/internal/store" "git.az-it.net/az/morz-infoboard/server/backend/internal/store"
) )
@ -43,7 +44,7 @@ func HandleGetPlaylist(screens *store.ScreenStore, playlists *store.PlaylistStor
} }
// HandleAddItem adds a playlist item (from existing media asset or direct URL). // HandleAddItem adds a playlist item (from existing media asset or direct URL).
func HandleAddItem(playlists *store.PlaylistStore, media *store.MediaStore) http.HandlerFunc { func HandleAddItem(playlists *store.PlaylistStore, media *store.MediaStore, notifier *mqttnotifier.Notifier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
playlistID := r.PathValue("playlistId") playlistID := r.PathValue("playlistId")
@ -98,6 +99,10 @@ func HandleAddItem(playlists *store.PlaylistStore, media *store.MediaStore) http
return return
} }
if slug, err := playlists.ScreenSlugByPlaylistID(r.Context(), playlistID); err == nil {
notifier.NotifyChanged(slug)
}
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated) w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(item) //nolint:errcheck json.NewEncoder(w).Encode(item) //nolint:errcheck
@ -105,7 +110,7 @@ func HandleAddItem(playlists *store.PlaylistStore, media *store.MediaStore) http
} }
// HandleUpdateItem updates duration, title, enabled, valid_from, valid_until. // HandleUpdateItem updates duration, title, enabled, valid_from, valid_until.
func HandleUpdateItem(playlists *store.PlaylistStore) http.HandlerFunc { func HandleUpdateItem(playlists *store.PlaylistStore, notifier *mqttnotifier.Notifier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("itemId") id := r.PathValue("itemId")
@ -136,24 +141,38 @@ func HandleUpdateItem(playlists *store.PlaylistStore) http.HandlerFunc {
http.Error(w, "db error", http.StatusInternalServerError) http.Error(w, "db error", http.StatusInternalServerError)
return return
} }
if slug, err := playlists.ScreenSlugByItemID(r.Context(), id); err == nil {
notifier.NotifyChanged(slug)
}
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
} }
// HandleDeleteItem removes a playlist item. // HandleDeleteItem removes a playlist item.
func HandleDeleteItem(playlists *store.PlaylistStore) http.HandlerFunc { func HandleDeleteItem(playlists *store.PlaylistStore, notifier *mqttnotifier.Notifier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("itemId") id := r.PathValue("itemId")
// Resolve slug before delete (item won't exist after).
slug, _ := playlists.ScreenSlugByItemID(r.Context(), id)
if err := playlists.DeleteItem(r.Context(), id); err != nil { if err := playlists.DeleteItem(r.Context(), id); err != nil {
http.Error(w, "db error", http.StatusInternalServerError) http.Error(w, "db error", http.StatusInternalServerError)
return return
} }
if slug != "" {
notifier.NotifyChanged(slug)
}
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
} }
// HandleReorder accepts an ordered list of item IDs and updates order_index. // HandleReorder accepts an ordered list of item IDs and updates order_index.
func HandleReorder(playlists *store.PlaylistStore) http.HandlerFunc { func HandleReorder(playlists *store.PlaylistStore, notifier *mqttnotifier.Notifier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
playlistID := r.PathValue("playlistId") playlistID := r.PathValue("playlistId")
@ -167,6 +186,11 @@ func HandleReorder(playlists *store.PlaylistStore) http.HandlerFunc {
http.Error(w, "db error", http.StatusInternalServerError) http.Error(w, "db error", http.StatusInternalServerError)
return return
} }
if slug, err := playlists.ScreenSlugByPlaylistID(r.Context(), playlistID); err == nil {
notifier.NotifyChanged(slug)
}
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
} }

View file

@ -12,6 +12,7 @@ import (
"strings" "strings"
"time" "time"
"git.az-it.net/az/morz-infoboard/server/backend/internal/mqttnotifier"
"git.az-it.net/az/morz-infoboard/server/backend/internal/store" "git.az-it.net/az/morz-infoboard/server/backend/internal/store"
) )
@ -294,7 +295,7 @@ func HandleUploadMediaUI(media *store.MediaStore, screens *store.ScreenStore, up
} }
// HandleAddItemUI handles form POST to add a playlist item, then redirects. // HandleAddItemUI handles form POST to add a playlist item, then redirects.
func HandleAddItemUI(playlists *store.PlaylistStore, media *store.MediaStore, screens *store.ScreenStore) http.HandlerFunc { func HandleAddItemUI(playlists *store.PlaylistStore, media *store.MediaStore, screens *store.ScreenStore, notifier *mqttnotifier.Notifier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
screenSlug := r.PathValue("screenSlug") screenSlug := r.PathValue("screenSlug")
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
@ -353,12 +354,13 @@ func HandleAddItemUI(playlists *store.PlaylistStore, media *store.MediaStore, sc
http.Error(w, "db error", http.StatusInternalServerError) http.Error(w, "db error", http.StatusInternalServerError)
return return
} }
notifier.NotifyChanged(screenSlug)
http.Redirect(w, r, "/manage/"+screenSlug+"?msg=added", http.StatusSeeOther) http.Redirect(w, r, "/manage/"+screenSlug+"?msg=added", http.StatusSeeOther)
} }
} }
// HandleDeleteItemUI removes a playlist item and redirects back. // HandleDeleteItemUI removes a playlist item and redirects back.
func HandleDeleteItemUI(playlists *store.PlaylistStore) http.HandlerFunc { func HandleDeleteItemUI(playlists *store.PlaylistStore, notifier *mqttnotifier.Notifier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
screenSlug := r.PathValue("screenSlug") screenSlug := r.PathValue("screenSlug")
itemID := r.PathValue("itemId") itemID := r.PathValue("itemId")
@ -366,12 +368,13 @@ func HandleDeleteItemUI(playlists *store.PlaylistStore) http.HandlerFunc {
http.Error(w, "db error", http.StatusInternalServerError) http.Error(w, "db error", http.StatusInternalServerError)
return return
} }
notifier.NotifyChanged(screenSlug)
http.Redirect(w, r, "/manage/"+screenSlug+"?msg=deleted", http.StatusSeeOther) http.Redirect(w, r, "/manage/"+screenSlug+"?msg=deleted", http.StatusSeeOther)
} }
} }
// HandleReorderUI accepts JSON body with ordered IDs (HTMX/fetch). // HandleReorderUI accepts JSON body with ordered IDs (HTMX/fetch).
func HandleReorderUI(playlists *store.PlaylistStore, screens *store.ScreenStore) http.HandlerFunc { func HandleReorderUI(playlists *store.PlaylistStore, screens *store.ScreenStore, notifier *mqttnotifier.Notifier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
screenSlug := r.PathValue("screenSlug") screenSlug := r.PathValue("screenSlug")
screen, err := screens.GetBySlug(r.Context(), screenSlug) screen, err := screens.GetBySlug(r.Context(), screenSlug)
@ -393,12 +396,13 @@ func HandleReorderUI(playlists *store.PlaylistStore, screens *store.ScreenStore)
http.Error(w, "db error", http.StatusInternalServerError) http.Error(w, "db error", http.StatusInternalServerError)
return return
} }
notifier.NotifyChanged(screenSlug)
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
} }
// HandleUpdateItemUI handles form PATCH/POST to update a single item. // HandleUpdateItemUI handles form PATCH/POST to update a single item.
func HandleUpdateItemUI(playlists *store.PlaylistStore) http.HandlerFunc { func HandleUpdateItemUI(playlists *store.PlaylistStore, notifier *mqttnotifier.Notifier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
screenSlug := r.PathValue("screenSlug") screenSlug := r.PathValue("screenSlug")
itemID := r.PathValue("itemId") itemID := r.PathValue("itemId")
@ -419,12 +423,13 @@ func HandleUpdateItemUI(playlists *store.PlaylistStore) http.HandlerFunc {
http.Error(w, "db error", http.StatusInternalServerError) http.Error(w, "db error", http.StatusInternalServerError)
return return
} }
notifier.NotifyChanged(screenSlug)
http.Redirect(w, r, "/manage/"+screenSlug+"?msg=saved", http.StatusSeeOther) http.Redirect(w, r, "/manage/"+screenSlug+"?msg=saved", http.StatusSeeOther)
} }
} }
// HandleDeleteMediaUI deletes media and redirects back. // HandleDeleteMediaUI deletes media and redirects back.
func HandleDeleteMediaUI(media *store.MediaStore, screens *store.ScreenStore, uploadDir string) http.HandlerFunc { func HandleDeleteMediaUI(media *store.MediaStore, screens *store.ScreenStore, uploadDir string, notifier *mqttnotifier.Notifier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
screenSlug := r.PathValue("screenSlug") screenSlug := r.PathValue("screenSlug")
mediaID := r.PathValue("mediaId") mediaID := r.PathValue("mediaId")
@ -435,6 +440,7 @@ func HandleDeleteMediaUI(media *store.MediaStore, screens *store.ScreenStore, up
} }
media.Delete(r.Context(), mediaID) //nolint:errcheck media.Delete(r.Context(), mediaID) //nolint:errcheck
notifier.NotifyChanged(screenSlug)
http.Redirect(w, r, "/manage/"+screenSlug+"?msg=deleted", http.StatusSeeOther) http.Redirect(w, r, "/manage/"+screenSlug+"?msg=deleted", http.StatusSeeOther)
} }
} }

View file

@ -5,6 +5,7 @@ import (
"net/http" "net/http"
"git.az-it.net/az/morz-infoboard/server/backend/internal/httpapi/manage" "git.az-it.net/az/morz-infoboard/server/backend/internal/httpapi/manage"
"git.az-it.net/az/morz-infoboard/server/backend/internal/mqttnotifier"
"git.az-it.net/az/morz-infoboard/server/backend/internal/store" "git.az-it.net/az/morz-infoboard/server/backend/internal/store"
) )
@ -15,6 +16,7 @@ type RouterDeps struct {
ScreenStore *store.ScreenStore ScreenStore *store.ScreenStore
MediaStore *store.MediaStore MediaStore *store.MediaStore
PlaylistStore *store.PlaylistStore PlaylistStore *store.PlaylistStore
Notifier *mqttnotifier.Notifier
UploadDir string UploadDir string
Logger *log.Logger Logger *log.Logger
} }
@ -73,6 +75,12 @@ func registerManageRoutes(mux *http.ServeMux, d RouterDeps) {
uploadDir = "/tmp/morz-uploads" uploadDir = "/tmp/morz-uploads"
} }
// Ensure notifier is never nil inside handlers (no-op when broker not configured).
notifier := d.Notifier
if notifier == nil {
notifier = mqttnotifier.New("", "", "")
}
// Serve uploaded files. // Serve uploaded files.
mux.Handle("GET /uploads/", http.StripPrefix("/uploads/", http.FileServer(http.Dir(uploadDir)))) mux.Handle("GET /uploads/", http.StripPrefix("/uploads/", http.FileServer(http.Dir(uploadDir))))
@ -92,15 +100,15 @@ func registerManageRoutes(mux *http.ServeMux, d RouterDeps) {
mux.HandleFunc("POST /manage/{screenSlug}/upload", mux.HandleFunc("POST /manage/{screenSlug}/upload",
manage.HandleUploadMediaUI(d.MediaStore, d.ScreenStore, uploadDir)) manage.HandleUploadMediaUI(d.MediaStore, d.ScreenStore, uploadDir))
mux.HandleFunc("POST /manage/{screenSlug}/items", mux.HandleFunc("POST /manage/{screenSlug}/items",
manage.HandleAddItemUI(d.PlaylistStore, d.MediaStore, d.ScreenStore)) manage.HandleAddItemUI(d.PlaylistStore, d.MediaStore, d.ScreenStore, notifier))
mux.HandleFunc("POST /manage/{screenSlug}/items/{itemId}", mux.HandleFunc("POST /manage/{screenSlug}/items/{itemId}",
manage.HandleUpdateItemUI(d.PlaylistStore)) manage.HandleUpdateItemUI(d.PlaylistStore, notifier))
mux.HandleFunc("POST /manage/{screenSlug}/items/{itemId}/delete", mux.HandleFunc("POST /manage/{screenSlug}/items/{itemId}/delete",
manage.HandleDeleteItemUI(d.PlaylistStore)) manage.HandleDeleteItemUI(d.PlaylistStore, notifier))
mux.HandleFunc("POST /manage/{screenSlug}/reorder", mux.HandleFunc("POST /manage/{screenSlug}/reorder",
manage.HandleReorderUI(d.PlaylistStore, d.ScreenStore)) manage.HandleReorderUI(d.PlaylistStore, d.ScreenStore, notifier))
mux.HandleFunc("POST /manage/{screenSlug}/media/{mediaId}/delete", mux.HandleFunc("POST /manage/{screenSlug}/media/{mediaId}/delete",
manage.HandleDeleteMediaUI(d.MediaStore, d.ScreenStore, uploadDir)) manage.HandleDeleteMediaUI(d.MediaStore, d.ScreenStore, uploadDir, notifier))
// ── JSON API — screens ──────────────────────────────────────────────── // ── JSON API — screens ────────────────────────────────────────────────
// Self-registration: called by agent on startup (must be before /{tenantSlug}/ routes) // Self-registration: called by agent on startup (must be before /{tenantSlug}/ routes)
@ -125,13 +133,13 @@ func registerManageRoutes(mux *http.ServeMux, d RouterDeps) {
mux.HandleFunc("GET /api/v1/playlists/{screenId}", mux.HandleFunc("GET /api/v1/playlists/{screenId}",
manage.HandleGetPlaylist(d.ScreenStore, d.PlaylistStore)) manage.HandleGetPlaylist(d.ScreenStore, d.PlaylistStore))
mux.HandleFunc("POST /api/v1/playlists/{playlistId}/items", mux.HandleFunc("POST /api/v1/playlists/{playlistId}/items",
manage.HandleAddItem(d.PlaylistStore, d.MediaStore)) manage.HandleAddItem(d.PlaylistStore, d.MediaStore, notifier))
mux.HandleFunc("PATCH /api/v1/items/{itemId}", mux.HandleFunc("PATCH /api/v1/items/{itemId}",
manage.HandleUpdateItem(d.PlaylistStore)) manage.HandleUpdateItem(d.PlaylistStore, notifier))
mux.HandleFunc("DELETE /api/v1/items/{itemId}", mux.HandleFunc("DELETE /api/v1/items/{itemId}",
manage.HandleDeleteItem(d.PlaylistStore)) manage.HandleDeleteItem(d.PlaylistStore, notifier))
mux.HandleFunc("PUT /api/v1/playlists/{playlistId}/order", mux.HandleFunc("PUT /api/v1/playlists/{playlistId}/order",
manage.HandleReorder(d.PlaylistStore)) manage.HandleReorder(d.PlaylistStore, notifier))
mux.HandleFunc("PATCH /api/v1/playlists/{playlistId}/duration", mux.HandleFunc("PATCH /api/v1/playlists/{playlistId}/duration",
manage.HandleUpdatePlaylistDuration(d.PlaylistStore)) manage.HandleUpdatePlaylistDuration(d.PlaylistStore))
} }

View file

@ -0,0 +1,106 @@
// Package mqttnotifier publishes playlist-changed notifications to MQTT.
// It is safe for concurrent use and applies per-screen debouncing so that
// rapid edits within a 2-second window produce at most one MQTT message.
package mqttnotifier
import (
"fmt"
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const (
// debounceDuration is the minimum time between two publish calls for the
// same screen. Any change arriving within this window resets the timer.
debounceDuration = 2 * time.Second
)
// Notifier publishes "playlist-changed" MQTT messages with per-screen debounce.
// If no broker URL is configured it behaves as a no-op (all methods are safe
// to call and do nothing).
type Notifier struct {
client mqtt.Client // nil when disabled
mu sync.Mutex
timers map[string]*time.Timer // keyed by screenSlug
}
// New creates a Notifier connected to broker (e.g. "tcp://mosquitto:1883").
// username/password may be empty. Returns a no-op Notifier when broker == "".
func New(broker, username, password string) *Notifier {
n := &Notifier{timers: make(map[string]*time.Timer)}
if broker == "" {
return n
}
opts := mqtt.NewClientOptions().
AddBroker(broker).
SetClientID("morz-backend").
SetCleanSession(true).
SetAutoReconnect(true).
SetConnectRetry(true).
SetConnectRetryInterval(10 * time.Second)
if username != "" {
opts.SetUsername(username)
opts.SetPassword(password)
}
n.client = mqtt.NewClient(opts)
n.client.Connect() // non-blocking; paho retries in background
return n
}
// Topic returns the MQTT topic for a screen's playlist-changed notification.
func Topic(screenSlug string) string {
return fmt.Sprintf("signage/screen/%s/playlist-changed", screenSlug)
}
// NotifyChanged schedules a publish for screenSlug. If another call for the
// same screen arrives within debounceDuration, the timer is reset (debounce).
// The method returns immediately; the actual publish happens in a goroutine.
func (n *Notifier) NotifyChanged(screenSlug string) {
if n.client == nil {
return
}
n.mu.Lock()
defer n.mu.Unlock()
// Reset existing timer if present (debounce).
if t, ok := n.timers[screenSlug]; ok {
t.Stop()
}
n.timers[screenSlug] = time.AfterFunc(debounceDuration, func() {
n.mu.Lock()
delete(n.timers, screenSlug)
n.mu.Unlock()
n.publish(screenSlug)
})
}
func (n *Notifier) publish(screenSlug string) {
topic := Topic(screenSlug)
payload := []byte(fmt.Sprintf(`{"ts":%d}`, time.Now().UnixMilli()))
token := n.client.Publish(topic, 0, false, payload)
token.WaitTimeout(3 * time.Second)
// Errors are silently dropped — the 60 s polling in the agent is the fallback.
}
// Close disconnects the MQTT client gracefully.
func (n *Notifier) Close() {
if n.client == nil {
return
}
n.mu.Lock()
for _, t := range n.timers {
t.Stop()
}
n.timers = make(map[string]*time.Timer)
n.mu.Unlock()
n.client.Disconnect(250)
}

View file

@ -407,6 +407,29 @@ func (s *PlaylistStore) DeleteItem(ctx context.Context, id string) error {
return err return err
} }
// ScreenSlugByPlaylistID returns the slug of the screen that owns playlistID.
func (s *PlaylistStore) ScreenSlugByPlaylistID(ctx context.Context, playlistID string) (string, error) {
var slug string
err := s.pool.QueryRow(ctx,
`select sc.slug
from playlists pl
join screens sc on sc.id = pl.screen_id
where pl.id = $1`, playlistID).Scan(&slug)
return slug, err
}
// ScreenSlugByItemID returns the slug of the screen that owns itemID.
func (s *PlaylistStore) ScreenSlugByItemID(ctx context.Context, itemID string) (string, error) {
var slug string
err := s.pool.QueryRow(ctx,
`select sc.slug
from playlist_items pi
join playlists pl on pl.id = pi.playlist_id
join screens sc on sc.id = pl.screen_id
where pi.id = $1`, itemID).Scan(&slug)
return slug, err
}
// Reorder sets order_index for each item ID in the given slice order. // Reorder sets order_index for each item ID in the given slice order.
func (s *PlaylistStore) Reorder(ctx context.Context, playlistID string, itemIDs []string) error { func (s *PlaylistStore) Reorder(ctx context.Context, playlistID string, itemIDs []string) error {
tx, err := s.pool.Begin(ctx) tx, err := s.pool.Begin(ctx)