feat(mqtt): MQTT-Config per Heartbeat-Response vom Server an Agents übertragen

Server gibt bei POST /api/v1/player/status jetzt mqtt-Block zurück (broker,
username, password) wenn MORZ_INFOBOARD_MQTT_BROKER gesetzt ist. Agents
parsen die Response und verbinden sich bei Config-Änderung automatisch neu
(applyMQTTConfig mit Reconnect-Logik, thread-safe via Mutex).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Jesko Anschütz 2026-03-24 15:03:15 +01:00
parent 1357dbe773
commit 6084712800
8 changed files with 193 additions and 39 deletions

View file

@ -89,6 +89,24 @@ Der Player-Agent sendet seinen aktuellen Status an den Server.
{ "status": "accepted" }
```
Wenn auf dem Server ein MQTT-Broker konfiguriert ist (`MORZ_INFOBOARD_MQTT_BROKER`), enthält die Response zusätzlich ein `mqtt`-Objekt mit den Verbindungsdaten. Der Agent soll diese Konfiguration übernehmen und seine MQTT-Verbindung bei Bedarf neu aufbauen.
```json
{
"status": "accepted",
"mqtt": {
"broker": "tcp://mqtt.example.com:1883",
"username": "agent",
"password": "secret"
}
}
```
- `mqtt` — nur vorhanden, wenn ein Broker konfiguriert ist (omitempty); fehlt das Feld, bleibt die bestehende MQTT-Konfiguration des Agents unverändert
- `mqtt.broker` — MQTT-Broker-URL (immer gesetzt, wenn `mqtt` vorhanden)
- `mqtt.username` — Benutzername (nur wenn konfiguriert, omitempty)
- `mqtt.password` — Passwort (nur wenn konfiguriert, omitempty)
---
## Screen Management (JSON API)
@ -1118,6 +1136,10 @@ Ruft den zuletzt hochgeladenen Screenshot eines Screens ab.
## Änderungshistorie
- **2026-03-24 (Update):** MQTT-Konfiguration in POST /api/v1/player/status Response dokumentiert (Doris / Doku-Review)
- Response enthält jetzt optionales `mqtt`-Objekt mit `broker`, `username`, `password` (alle omitempty wenn leer)
- Feld wird nur gesendet wenn `MORZ_INFOBOARD_MQTT_BROKER` konfiguriert ist
- Agent übernimmt die Konfiguration und reconnectet MQTT bei Änderung
- **2026-03-24 (Update):** Screenshot-Endpoints implementiert und dokumentiert (Doris / Doku-Review)
- `POST /api/v1/player/screenshot` — war als "In Vorbereitung" markiert, ist jetzt vollständig implementiert; Abschnitt komplett neu verfasst
- `GET /api/v1/screens/{screenId}/screenshot` — neuer Endpoint, `authOnly`, liefert Raw-Image aus In-Memory-Store

View file

@ -62,6 +62,11 @@ type App struct {
startedAt time.Time
lastHeartbeatAt time.Time
// mqttMu guards mqttPub and mqttSub as well as the MQTT fields in Config
// (Config.MQTTBroker, Config.MQTTUsername, Config.MQTTPassword).
mqttMu sync.Mutex
mqttSub mqttCloser
// Playlist fetched from the backend (protected by playlistMu).
playlistMu sync.RWMutex
playlist []playerserver.PlaylistItem
@ -70,10 +75,18 @@ type App struct {
// arrives (after debouncing in the subscriber). pollPlaylist listens on
// this channel to trigger an immediate fetchPlaylist call.
mqttFetchC chan struct{}
// screenshotFn is kept so that applyMQTTConfig can pass it to the new subscriber.
screenshotFn func()
}
// mqttCloser is implemented by mqttsubscriber.Subscriber.
type mqttCloser interface {
Close()
}
type statusSender interface {
Send(ctx context.Context, snapshot statusreporter.Snapshot) error
Send(ctx context.Context, snapshot statusreporter.Snapshot) (statusreporter.MQTTConfig, error)
}
type mqttSender interface {
@ -202,6 +215,12 @@ func (a *App) Run(ctx context.Context) error {
// Screenshot-Instanz immer anlegen (für periodische und On-Demand-Screenshots).
ss := screenshot.New(a.Config.ScreenID, a.Config.ServerBaseURL, a.Config.ScreenshotEvery, a.logger)
// Keep screenshot callback so applyMQTTConfig can hand it to new subscribers.
a.screenshotFn = func() {
a.logger.Printf("event=mqtt_screenshot_request screen_id=%s", a.Config.ScreenID)
go ss.TakeAndSendOnce(ctx)
}
// Subscribe to playlist-changed and screenshot-request MQTT notifications (optional; fallback = polling).
sub := mqttsubscriber.New(
a.Config.MQTTBroker,
@ -216,16 +235,15 @@ func (a *App) Run(ctx context.Context) error {
}
a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID)
},
func() {
a.logger.Printf("event=mqtt_screenshot_request screen_id=%s", a.Config.ScreenID)
go ss.TakeAndSendOnce(ctx)
},
a.screenshotFn,
)
a.mqttMu.Lock()
a.mqttSub = sub
a.mqttMu.Unlock()
if sub != nil {
a.logger.Printf("event=mqtt_subscriber_enabled broker=%s screen_id=%s topic=%s screenshot_topic=%s",
a.Config.MQTTBroker, a.Config.ScreenID, mqttsubscriber.Topic(a.Config.ScreenID),
mqttsubscriber.ScreenshotRequestTopic(a.Config.ScreenID))
defer sub.Close()
}
// Start polling the backend for playlist updates (60 s fallback + MQTT trigger).
@ -256,9 +274,14 @@ func (a *App) Run(ctx context.Context) error {
a.mu.Lock()
a.status = StatusStopped
a.mu.Unlock()
a.mqttMu.Lock()
if a.mqttPub != nil {
a.mqttPub.Close()
}
if a.mqttSub != nil {
a.mqttSub.Close()
}
a.mqttMu.Unlock()
a.logger.Printf("event=agent_stopped screen_id=%s", a.Config.ScreenID)
return nil
case <-ticker.C:
@ -411,7 +434,7 @@ func (a *App) reportStatus(ctx context.Context) {
payloadConnectivity = ConnectivityOnline
}
err := a.reporter.Send(ctx, statusreporter.Snapshot{
mqttCfg, err := a.reporter.Send(ctx, statusreporter.Snapshot{
Status: string(snapshot.Status),
ServerConnectivity: string(payloadConnectivity),
ScreenID: snapshot.ScreenID,
@ -437,4 +460,59 @@ func (a *App) reportStatus(ctx context.Context) {
a.consecutiveReportFailures = 0
a.serverConnectivity = ConnectivityOnline
a.mu.Unlock()
// Apply MQTT config from server response if broker was provided.
if mqttCfg.Broker != "" {
a.applyMQTTConfig(mqttCfg.Broker, mqttCfg.Username, mqttCfg.Password)
}
}
// applyMQTTConfig checks whether the MQTT configuration has changed and, if so,
// gracefully stops the existing MQTT clients and starts new ones.
// It is safe for concurrent use — all MQTT client mutations are protected by mqttMu.
func (a *App) applyMQTTConfig(broker, username, password string) {
a.mqttMu.Lock()
defer a.mqttMu.Unlock()
// Nothing to do when the config is unchanged.
if a.Config.MQTTBroker == broker &&
a.Config.MQTTUsername == username &&
a.Config.MQTTPassword == password {
return
}
a.logger.Printf("event=mqtt_config_changed screen_id=%s old_broker=%s new_broker=%s",
a.Config.ScreenID, a.Config.MQTTBroker, broker)
// Stop existing clients.
if a.mqttPub != nil {
a.mqttPub.Close()
a.mqttPub = nil
}
if a.mqttSub != nil {
a.mqttSub.Close()
a.mqttSub = nil
}
// Update stored config.
a.Config.MQTTBroker = broker
a.Config.MQTTUsername = username
a.Config.MQTTPassword = password
// Start new clients.
a.mqttPub = mqttheartbeat.New(broker, a.Config.ScreenID, username, password)
a.logger.Printf("event=mqtt_heartbeat_restarted screen_id=%s broker=%s", a.Config.ScreenID, broker)
playlistChangedFn := func() {
select {
case a.mqttFetchC <- struct{}{}:
default:
}
a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID)
}
sub := mqttsubscriber.New(broker, a.Config.ScreenID, username, password, playlistChangedFn, a.screenshotFn)
a.mqttSub = sub
if sub != nil {
a.logger.Printf("event=mqtt_subscriber_restarted screen_id=%s broker=%s", a.Config.ScreenID, broker)
}
}

View file

@ -19,15 +19,15 @@ type recordingReporter struct {
snapshots []statusreporter.Snapshot
}
func (r *recordingReporter) Send(_ context.Context, snapshot statusreporter.Snapshot) error {
func (r *recordingReporter) Send(_ context.Context, snapshot statusreporter.Snapshot) (statusreporter.MQTTConfig, error) {
r.callCount++
r.snapshots = append(r.snapshots, snapshot)
if len(r.errs) > 0 {
err := r.errs[0]
r.errs = r.errs[1:]
return err
return statusreporter.MQTTConfig{}, err
}
return r.err
return statusreporter.MQTTConfig{}, r.err
}
func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) {

View file

@ -33,6 +33,27 @@ type statusPayload struct {
LastHeartbeatAt string `json:"last_heartbeat_at,omitempty"`
}
// MQTTConfig holds the MQTT broker configuration returned by the server in the
// status-report response. All fields are empty when the server did not send
// a mqtt object (e.g. MQTT is disabled on the server side).
type MQTTConfig struct {
Broker string
Username string
Password string
}
// serverResponse is the JSON body returned by POST /api/v1/player/status.
type serverResponse struct {
Status string `json:"status"`
MQTT *serverMQTTBlock `json:"mqtt,omitempty"`
}
type serverMQTTBlock struct {
Broker string `json:"broker"`
Username string `json:"username"`
Password string `json:"password"`
}
type Reporter struct {
baseURL string
client *http.Client
@ -55,34 +76,47 @@ func New(baseURL string, client *http.Client, now func() time.Time) *Reporter {
}
}
func (r *Reporter) Send(ctx context.Context, snapshot Snapshot) error {
// Send reports the snapshot to the server and returns the MQTT configuration
// provided by the server in the response body. If the server does not include
// a mqtt object the returned MQTTConfig will have an empty Broker field.
func (r *Reporter) Send(ctx context.Context, snapshot Snapshot) (MQTTConfig, error) {
payload := buildPayload(snapshot, r.now())
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal status payload: %w", err)
return MQTTConfig{}, fmt.Errorf("marshal status payload: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.baseURL+"/api/v1/player/status", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("build status request: %w", err)
return MQTTConfig{}, fmt.Errorf("build status request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := r.client.Do(req)
if err != nil {
return fmt.Errorf("send status request: %w", err)
return MQTTConfig{}, fmt.Errorf("send status request: %w", err)
}
statusCode := resp.StatusCode
statusText := resp.Status
if err := resp.Body.Close(); err != nil {
return fmt.Errorf("close status response body: %w", err)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return MQTTConfig{}, fmt.Errorf("unexpected status response: %s", resp.Status)
}
if statusCode != http.StatusOK {
return fmt.Errorf("unexpected status response: %s", statusText)
var sr serverResponse
if err := json.NewDecoder(resp.Body).Decode(&sr); err != nil {
// Non-fatal: decoding failure just means no MQTT config update.
return MQTTConfig{}, nil
}
return nil
if sr.MQTT != nil {
return MQTTConfig{
Broker: sr.MQTT.Broker,
Username: sr.MQTT.Username,
Password: sr.MQTT.Password,
}, nil
}
return MQTTConfig{}, nil
}
func buildPayload(snapshot Snapshot, now time.Time) statusPayload {

View file

@ -70,7 +70,7 @@ func TestReporterSendStatus(t *testing.T) {
return time.Date(2026, 3, 22, 16, 0, 0, 0, time.UTC)
})
err := reporter.Send(context.Background(), Snapshot{
_, err := reporter.Send(context.Background(), Snapshot{
Status: "running",
ServerConnectivity: "online",
ScreenID: "info01-dev",

View file

@ -49,7 +49,21 @@ type playerStatusRequest struct {
LastHeartbeatAt string `json:"last_heartbeat_at"`
}
func handlePlayerStatus(store playerStatusStore) http.HandlerFunc {
// playerStatusMQTTConfig is the MQTT configuration returned to agents in the
// status-report response. It is omitted entirely when Broker is empty.
type playerStatusMQTTConfig struct {
Broker string `json:"broker"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}
// playerStatusResponse is the JSON body returned for POST /api/v1/player/status.
type playerStatusResponse struct {
Status string `json:"status"`
MQTT *playerStatusMQTTConfig `json:"mqtt,omitempty"`
}
func handlePlayerStatus(store playerStatusStore, mqttBroker, mqttUsername, mqttPassword string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var request playerStatusRequest
if err := decodeJSON(r, &request); err != nil {
@ -116,9 +130,15 @@ func handlePlayerStatus(store playerStatusStore) http.HandlerFunc {
LastHeartbeatAt: request.LastHeartbeatAt,
})
writeJSON(w, http.StatusOK, map[string]string{
"status": "accepted",
})
resp := playerStatusResponse{Status: "accepted"}
if mqttBroker != "" {
resp.MQTT = &playerStatusMQTTConfig{
Broker: mqttBroker,
Username: mqttUsername,
Password: mqttPassword,
}
}
writeJSON(w, http.StatusOK, resp)
}
}

View file

@ -26,7 +26,7 @@ func TestHandlePlayerStatusAccepted(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body))
w := httptest.NewRecorder()
handlePlayerStatus(store)(w, req)
handlePlayerStatus(store, "", "", "")(w, req)
if got, want := w.Code, http.StatusOK; got != want {
t.Fatalf("status = %d, want %d", got, want)
@ -66,7 +66,7 @@ func TestHandlePlayerStatusRejectsInvalidJSON(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewBufferString("{"))
w := httptest.NewRecorder()
handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req)
handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req)
if got, want := w.Code, http.StatusBadRequest; got != want {
t.Fatalf("status = %d, want %d", got, want)
@ -83,7 +83,7 @@ func TestHandlePlayerStatusRejectsMissingScreenID(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body))
w := httptest.NewRecorder()
handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req)
handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req)
if got, want := w.Code, http.StatusBadRequest; got != want {
t.Fatalf("status = %d, want %d", got, want)
@ -102,7 +102,7 @@ func TestHandlePlayerStatusStoresNormalizedScreenID(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body))
w := httptest.NewRecorder()
handlePlayerStatus(store)(w, req)
handlePlayerStatus(store, "", "", "")(w, req)
if got, want := w.Code, http.StatusOK; got != want {
t.Fatalf("status = %d, want %d", got, want)
@ -122,7 +122,7 @@ func TestHandlePlayerStatusRejectsMissingTimestamp(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body))
w := httptest.NewRecorder()
handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req)
handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req)
if got, want := w.Code, http.StatusBadRequest; got != want {
t.Fatalf("status = %d, want %d", got, want)
@ -138,7 +138,7 @@ func TestHandlePlayerStatusRejectsMissingStatus(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body))
w := httptest.NewRecorder()
handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req)
handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req)
if got, want := w.Code, http.StatusBadRequest; got != want {
t.Fatalf("status = %d, want %d", got, want)
@ -155,7 +155,7 @@ func TestHandlePlayerStatusRejectsUnknownStatus(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body))
w := httptest.NewRecorder()
handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req)
handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req)
if got, want := w.Code, http.StatusBadRequest; got != want {
t.Fatalf("status = %d, want %d", got, want)
@ -173,7 +173,7 @@ func TestHandlePlayerStatusRejectsUnknownServerConnectivity(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body))
w := httptest.NewRecorder()
handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req)
handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req)
if got, want := w.Code, http.StatusBadRequest; got != want {
t.Fatalf("status = %d, want %d", got, want)
@ -191,7 +191,7 @@ func TestHandlePlayerStatusRejectsNonPositiveHeartbeatInterval(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body))
w := httptest.NewRecorder()
handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req)
handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req)
if got, want := w.Code, http.StatusBadRequest; got != want {
t.Fatalf("status = %d, want %d", got, want)
@ -208,7 +208,7 @@ func TestHandlePlayerStatusRejectsMalformedTimestamps(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body))
w := httptest.NewRecorder()
handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req)
handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req)
if got, want := w.Code, http.StatusBadRequest; got != want {
t.Fatalf("status = %d, want %d", got, want)
@ -226,7 +226,7 @@ func TestHandlePlayerStatusRejectsMalformedStartedAt(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body))
w := httptest.NewRecorder()
handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req)
handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req)
if got, want := w.Code, http.StatusBadRequest; got != want {
t.Fatalf("status = %d, want %d", got, want)
@ -244,7 +244,7 @@ func TestHandlePlayerStatusRejectsMalformedLastHeartbeatAt(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body))
w := httptest.NewRecorder()
handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req)
handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req)
if got, want := w.Code, http.StatusBadRequest; got != want {
t.Fatalf("status = %d, want %d", got, want)

View file

@ -67,7 +67,7 @@ func NewRouter(deps RouterDeps) http.Handler {
mux.HandleFunc("GET /api/v1/meta", handleMeta)
// ── Player status (existing) ──────────────────────────────────────────
mux.HandleFunc("POST /api/v1/player/status", handlePlayerStatus(deps.StatusStore))
mux.HandleFunc("POST /api/v1/player/status", handlePlayerStatus(deps.StatusStore, deps.Config.MQTTBroker, deps.Config.MQTTUsername, deps.Config.MQTTPassword))
mux.HandleFunc("POST /api/v1/player/screenshot", handlePlayerScreenshot(deps.ScreenshotStore))
mux.HandleFunc("GET /api/v1/screens/status", handleListLatestPlayerStatuses(deps.StatusStore))
mux.HandleFunc("GET /api/v1/screens/{screenId}/status", handleGetLatestPlayerStatus(deps.StatusStore))