diff --git a/docs/API-ENDPOINTS.md b/docs/API-ENDPOINTS.md index ec4017b..1e4d7b4 100644 --- a/docs/API-ENDPOINTS.md +++ b/docs/API-ENDPOINTS.md @@ -89,6 +89,24 @@ Der Player-Agent sendet seinen aktuellen Status an den Server. { "status": "accepted" } ``` +Wenn auf dem Server ein MQTT-Broker konfiguriert ist (`MORZ_INFOBOARD_MQTT_BROKER`), enthält die Response zusätzlich ein `mqtt`-Objekt mit den Verbindungsdaten. Der Agent soll diese Konfiguration übernehmen und seine MQTT-Verbindung bei Bedarf neu aufbauen. + +```json +{ + "status": "accepted", + "mqtt": { + "broker": "tcp://mqtt.example.com:1883", + "username": "agent", + "password": "secret" + } +} +``` + +- `mqtt` — nur vorhanden, wenn ein Broker konfiguriert ist (omitempty); fehlt das Feld, bleibt die bestehende MQTT-Konfiguration des Agents unverändert +- `mqtt.broker` — MQTT-Broker-URL (immer gesetzt, wenn `mqtt` vorhanden) +- `mqtt.username` — Benutzername (nur wenn konfiguriert, omitempty) +- `mqtt.password` — Passwort (nur wenn konfiguriert, omitempty) + --- ## Screen Management (JSON API) @@ -1118,6 +1136,10 @@ Ruft den zuletzt hochgeladenen Screenshot eines Screens ab. ## Änderungshistorie +- **2026-03-24 (Update):** MQTT-Konfiguration in POST /api/v1/player/status Response dokumentiert (Doris / Doku-Review) + - Response enthält jetzt optionales `mqtt`-Objekt mit `broker`, `username`, `password` (alle omitempty wenn leer) + - Feld wird nur gesendet wenn `MORZ_INFOBOARD_MQTT_BROKER` konfiguriert ist + - Agent übernimmt die Konfiguration und reconnectet MQTT bei Änderung - **2026-03-24 (Update):** Screenshot-Endpoints implementiert und dokumentiert (Doris / Doku-Review) - `POST /api/v1/player/screenshot` — war als "In Vorbereitung" markiert, ist jetzt vollständig implementiert; Abschnitt komplett neu verfasst - `GET /api/v1/screens/{screenId}/screenshot` — neuer Endpoint, `authOnly`, liefert Raw-Image aus In-Memory-Store diff --git a/player/agent/internal/app/app.go b/player/agent/internal/app/app.go index 28be01c..a952597 100644 --- a/player/agent/internal/app/app.go +++ b/player/agent/internal/app/app.go @@ -62,6 +62,11 @@ type App struct { startedAt time.Time lastHeartbeatAt time.Time + // mqttMu guards mqttPub and mqttSub as well as the MQTT fields in Config + // (Config.MQTTBroker, Config.MQTTUsername, Config.MQTTPassword). + mqttMu sync.Mutex + mqttSub mqttCloser + // Playlist fetched from the backend (protected by playlistMu). playlistMu sync.RWMutex playlist []playerserver.PlaylistItem @@ -70,10 +75,18 @@ type App struct { // arrives (after debouncing in the subscriber). pollPlaylist listens on // this channel to trigger an immediate fetchPlaylist call. mqttFetchC chan struct{} + + // screenshotFn is kept so that applyMQTTConfig can pass it to the new subscriber. + screenshotFn func() +} + +// mqttCloser is implemented by mqttsubscriber.Subscriber. +type mqttCloser interface { + Close() } type statusSender interface { - Send(ctx context.Context, snapshot statusreporter.Snapshot) error + Send(ctx context.Context, snapshot statusreporter.Snapshot) (statusreporter.MQTTConfig, error) } type mqttSender interface { @@ -202,6 +215,12 @@ func (a *App) Run(ctx context.Context) error { // Screenshot-Instanz immer anlegen (für periodische und On-Demand-Screenshots). ss := screenshot.New(a.Config.ScreenID, a.Config.ServerBaseURL, a.Config.ScreenshotEvery, a.logger) + // Keep screenshot callback so applyMQTTConfig can hand it to new subscribers. + a.screenshotFn = func() { + a.logger.Printf("event=mqtt_screenshot_request screen_id=%s", a.Config.ScreenID) + go ss.TakeAndSendOnce(ctx) + } + // Subscribe to playlist-changed and screenshot-request MQTT notifications (optional; fallback = polling). sub := mqttsubscriber.New( a.Config.MQTTBroker, @@ -216,16 +235,15 @@ func (a *App) Run(ctx context.Context) error { } a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID) }, - func() { - a.logger.Printf("event=mqtt_screenshot_request screen_id=%s", a.Config.ScreenID) - go ss.TakeAndSendOnce(ctx) - }, + a.screenshotFn, ) + a.mqttMu.Lock() + a.mqttSub = sub + a.mqttMu.Unlock() if sub != nil { a.logger.Printf("event=mqtt_subscriber_enabled broker=%s screen_id=%s topic=%s screenshot_topic=%s", a.Config.MQTTBroker, a.Config.ScreenID, mqttsubscriber.Topic(a.Config.ScreenID), mqttsubscriber.ScreenshotRequestTopic(a.Config.ScreenID)) - defer sub.Close() } // Start polling the backend for playlist updates (60 s fallback + MQTT trigger). @@ -256,9 +274,14 @@ func (a *App) Run(ctx context.Context) error { a.mu.Lock() a.status = StatusStopped a.mu.Unlock() + a.mqttMu.Lock() if a.mqttPub != nil { a.mqttPub.Close() } + if a.mqttSub != nil { + a.mqttSub.Close() + } + a.mqttMu.Unlock() a.logger.Printf("event=agent_stopped screen_id=%s", a.Config.ScreenID) return nil case <-ticker.C: @@ -411,7 +434,7 @@ func (a *App) reportStatus(ctx context.Context) { payloadConnectivity = ConnectivityOnline } - err := a.reporter.Send(ctx, statusreporter.Snapshot{ + mqttCfg, err := a.reporter.Send(ctx, statusreporter.Snapshot{ Status: string(snapshot.Status), ServerConnectivity: string(payloadConnectivity), ScreenID: snapshot.ScreenID, @@ -437,4 +460,59 @@ func (a *App) reportStatus(ctx context.Context) { a.consecutiveReportFailures = 0 a.serverConnectivity = ConnectivityOnline a.mu.Unlock() + + // Apply MQTT config from server response if broker was provided. + if mqttCfg.Broker != "" { + a.applyMQTTConfig(mqttCfg.Broker, mqttCfg.Username, mqttCfg.Password) + } +} + +// applyMQTTConfig checks whether the MQTT configuration has changed and, if so, +// gracefully stops the existing MQTT clients and starts new ones. +// It is safe for concurrent use — all MQTT client mutations are protected by mqttMu. +func (a *App) applyMQTTConfig(broker, username, password string) { + a.mqttMu.Lock() + defer a.mqttMu.Unlock() + + // Nothing to do when the config is unchanged. + if a.Config.MQTTBroker == broker && + a.Config.MQTTUsername == username && + a.Config.MQTTPassword == password { + return + } + + a.logger.Printf("event=mqtt_config_changed screen_id=%s old_broker=%s new_broker=%s", + a.Config.ScreenID, a.Config.MQTTBroker, broker) + + // Stop existing clients. + if a.mqttPub != nil { + a.mqttPub.Close() + a.mqttPub = nil + } + if a.mqttSub != nil { + a.mqttSub.Close() + a.mqttSub = nil + } + + // Update stored config. + a.Config.MQTTBroker = broker + a.Config.MQTTUsername = username + a.Config.MQTTPassword = password + + // Start new clients. + a.mqttPub = mqttheartbeat.New(broker, a.Config.ScreenID, username, password) + a.logger.Printf("event=mqtt_heartbeat_restarted screen_id=%s broker=%s", a.Config.ScreenID, broker) + + playlistChangedFn := func() { + select { + case a.mqttFetchC <- struct{}{}: + default: + } + a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID) + } + sub := mqttsubscriber.New(broker, a.Config.ScreenID, username, password, playlistChangedFn, a.screenshotFn) + a.mqttSub = sub + if sub != nil { + a.logger.Printf("event=mqtt_subscriber_restarted screen_id=%s broker=%s", a.Config.ScreenID, broker) + } } diff --git a/player/agent/internal/app/app_test.go b/player/agent/internal/app/app_test.go index c463ba8..06d8145 100644 --- a/player/agent/internal/app/app_test.go +++ b/player/agent/internal/app/app_test.go @@ -19,15 +19,15 @@ type recordingReporter struct { snapshots []statusreporter.Snapshot } -func (r *recordingReporter) Send(_ context.Context, snapshot statusreporter.Snapshot) error { +func (r *recordingReporter) Send(_ context.Context, snapshot statusreporter.Snapshot) (statusreporter.MQTTConfig, error) { r.callCount++ r.snapshots = append(r.snapshots, snapshot) if len(r.errs) > 0 { err := r.errs[0] r.errs = r.errs[1:] - return err + return statusreporter.MQTTConfig{}, err } - return r.err + return statusreporter.MQTTConfig{}, r.err } func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) { diff --git a/player/agent/internal/statusreporter/reporter.go b/player/agent/internal/statusreporter/reporter.go index d5edecb..f06b0d8 100644 --- a/player/agent/internal/statusreporter/reporter.go +++ b/player/agent/internal/statusreporter/reporter.go @@ -33,6 +33,27 @@ type statusPayload struct { LastHeartbeatAt string `json:"last_heartbeat_at,omitempty"` } +// MQTTConfig holds the MQTT broker configuration returned by the server in the +// status-report response. All fields are empty when the server did not send +// a mqtt object (e.g. MQTT is disabled on the server side). +type MQTTConfig struct { + Broker string + Username string + Password string +} + +// serverResponse is the JSON body returned by POST /api/v1/player/status. +type serverResponse struct { + Status string `json:"status"` + MQTT *serverMQTTBlock `json:"mqtt,omitempty"` +} + +type serverMQTTBlock struct { + Broker string `json:"broker"` + Username string `json:"username"` + Password string `json:"password"` +} + type Reporter struct { baseURL string client *http.Client @@ -55,34 +76,47 @@ func New(baseURL string, client *http.Client, now func() time.Time) *Reporter { } } -func (r *Reporter) Send(ctx context.Context, snapshot Snapshot) error { +// Send reports the snapshot to the server and returns the MQTT configuration +// provided by the server in the response body. If the server does not include +// a mqtt object the returned MQTTConfig will have an empty Broker field. +func (r *Reporter) Send(ctx context.Context, snapshot Snapshot) (MQTTConfig, error) { payload := buildPayload(snapshot, r.now()) body, err := json.Marshal(payload) if err != nil { - return fmt.Errorf("marshal status payload: %w", err) + return MQTTConfig{}, fmt.Errorf("marshal status payload: %w", err) } req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.baseURL+"/api/v1/player/status", bytes.NewReader(body)) if err != nil { - return fmt.Errorf("build status request: %w", err) + return MQTTConfig{}, fmt.Errorf("build status request: %w", err) } req.Header.Set("Content-Type", "application/json") resp, err := r.client.Do(req) if err != nil { - return fmt.Errorf("send status request: %w", err) + return MQTTConfig{}, fmt.Errorf("send status request: %w", err) } - statusCode := resp.StatusCode - statusText := resp.Status - if err := resp.Body.Close(); err != nil { - return fmt.Errorf("close status response body: %w", err) + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return MQTTConfig{}, fmt.Errorf("unexpected status response: %s", resp.Status) } - if statusCode != http.StatusOK { - return fmt.Errorf("unexpected status response: %s", statusText) + var sr serverResponse + if err := json.NewDecoder(resp.Body).Decode(&sr); err != nil { + // Non-fatal: decoding failure just means no MQTT config update. + return MQTTConfig{}, nil } - return nil + if sr.MQTT != nil { + return MQTTConfig{ + Broker: sr.MQTT.Broker, + Username: sr.MQTT.Username, + Password: sr.MQTT.Password, + }, nil + } + + return MQTTConfig{}, nil } func buildPayload(snapshot Snapshot, now time.Time) statusPayload { diff --git a/player/agent/internal/statusreporter/reporter_test.go b/player/agent/internal/statusreporter/reporter_test.go index 7478ccd..7d45a59 100644 --- a/player/agent/internal/statusreporter/reporter_test.go +++ b/player/agent/internal/statusreporter/reporter_test.go @@ -70,7 +70,7 @@ func TestReporterSendStatus(t *testing.T) { return time.Date(2026, 3, 22, 16, 0, 0, 0, time.UTC) }) - err := reporter.Send(context.Background(), Snapshot{ + _, err := reporter.Send(context.Background(), Snapshot{ Status: "running", ServerConnectivity: "online", ScreenID: "info01-dev", diff --git a/server/backend/internal/httpapi/playerstatus.go b/server/backend/internal/httpapi/playerstatus.go index a13455a..2428d2c 100644 --- a/server/backend/internal/httpapi/playerstatus.go +++ b/server/backend/internal/httpapi/playerstatus.go @@ -49,7 +49,21 @@ type playerStatusRequest struct { LastHeartbeatAt string `json:"last_heartbeat_at"` } -func handlePlayerStatus(store playerStatusStore) http.HandlerFunc { +// playerStatusMQTTConfig is the MQTT configuration returned to agents in the +// status-report response. It is omitted entirely when Broker is empty. +type playerStatusMQTTConfig struct { + Broker string `json:"broker"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` +} + +// playerStatusResponse is the JSON body returned for POST /api/v1/player/status. +type playerStatusResponse struct { + Status string `json:"status"` + MQTT *playerStatusMQTTConfig `json:"mqtt,omitempty"` +} + +func handlePlayerStatus(store playerStatusStore, mqttBroker, mqttUsername, mqttPassword string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var request playerStatusRequest if err := decodeJSON(r, &request); err != nil { @@ -116,9 +130,15 @@ func handlePlayerStatus(store playerStatusStore) http.HandlerFunc { LastHeartbeatAt: request.LastHeartbeatAt, }) - writeJSON(w, http.StatusOK, map[string]string{ - "status": "accepted", - }) + resp := playerStatusResponse{Status: "accepted"} + if mqttBroker != "" { + resp.MQTT = &playerStatusMQTTConfig{ + Broker: mqttBroker, + Username: mqttUsername, + Password: mqttPassword, + } + } + writeJSON(w, http.StatusOK, resp) } } diff --git a/server/backend/internal/httpapi/playerstatus_test.go b/server/backend/internal/httpapi/playerstatus_test.go index c4e8aab..0c4206f 100644 --- a/server/backend/internal/httpapi/playerstatus_test.go +++ b/server/backend/internal/httpapi/playerstatus_test.go @@ -26,7 +26,7 @@ func TestHandlePlayerStatusAccepted(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body)) w := httptest.NewRecorder() - handlePlayerStatus(store)(w, req) + handlePlayerStatus(store, "", "", "")(w, req) if got, want := w.Code, http.StatusOK; got != want { t.Fatalf("status = %d, want %d", got, want) @@ -66,7 +66,7 @@ func TestHandlePlayerStatusRejectsInvalidJSON(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewBufferString("{")) w := httptest.NewRecorder() - handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req) + handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req) if got, want := w.Code, http.StatusBadRequest; got != want { t.Fatalf("status = %d, want %d", got, want) @@ -83,7 +83,7 @@ func TestHandlePlayerStatusRejectsMissingScreenID(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body)) w := httptest.NewRecorder() - handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req) + handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req) if got, want := w.Code, http.StatusBadRequest; got != want { t.Fatalf("status = %d, want %d", got, want) @@ -102,7 +102,7 @@ func TestHandlePlayerStatusStoresNormalizedScreenID(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body)) w := httptest.NewRecorder() - handlePlayerStatus(store)(w, req) + handlePlayerStatus(store, "", "", "")(w, req) if got, want := w.Code, http.StatusOK; got != want { t.Fatalf("status = %d, want %d", got, want) @@ -122,7 +122,7 @@ func TestHandlePlayerStatusRejectsMissingTimestamp(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body)) w := httptest.NewRecorder() - handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req) + handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req) if got, want := w.Code, http.StatusBadRequest; got != want { t.Fatalf("status = %d, want %d", got, want) @@ -138,7 +138,7 @@ func TestHandlePlayerStatusRejectsMissingStatus(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body)) w := httptest.NewRecorder() - handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req) + handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req) if got, want := w.Code, http.StatusBadRequest; got != want { t.Fatalf("status = %d, want %d", got, want) @@ -155,7 +155,7 @@ func TestHandlePlayerStatusRejectsUnknownStatus(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body)) w := httptest.NewRecorder() - handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req) + handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req) if got, want := w.Code, http.StatusBadRequest; got != want { t.Fatalf("status = %d, want %d", got, want) @@ -173,7 +173,7 @@ func TestHandlePlayerStatusRejectsUnknownServerConnectivity(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body)) w := httptest.NewRecorder() - handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req) + handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req) if got, want := w.Code, http.StatusBadRequest; got != want { t.Fatalf("status = %d, want %d", got, want) @@ -191,7 +191,7 @@ func TestHandlePlayerStatusRejectsNonPositiveHeartbeatInterval(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body)) w := httptest.NewRecorder() - handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req) + handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req) if got, want := w.Code, http.StatusBadRequest; got != want { t.Fatalf("status = %d, want %d", got, want) @@ -208,7 +208,7 @@ func TestHandlePlayerStatusRejectsMalformedTimestamps(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body)) w := httptest.NewRecorder() - handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req) + handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req) if got, want := w.Code, http.StatusBadRequest; got != want { t.Fatalf("status = %d, want %d", got, want) @@ -226,7 +226,7 @@ func TestHandlePlayerStatusRejectsMalformedStartedAt(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body)) w := httptest.NewRecorder() - handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req) + handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req) if got, want := w.Code, http.StatusBadRequest; got != want { t.Fatalf("status = %d, want %d", got, want) @@ -244,7 +244,7 @@ func TestHandlePlayerStatusRejectsMalformedLastHeartbeatAt(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/v1/player/status", bytes.NewReader(body)) w := httptest.NewRecorder() - handlePlayerStatus(newInMemoryPlayerStatusStore())(w, req) + handlePlayerStatus(newInMemoryPlayerStatusStore(), "", "", "")(w, req) if got, want := w.Code, http.StatusBadRequest; got != want { t.Fatalf("status = %d, want %d", got, want) diff --git a/server/backend/internal/httpapi/router.go b/server/backend/internal/httpapi/router.go index 1bdc342..3a33b78 100644 --- a/server/backend/internal/httpapi/router.go +++ b/server/backend/internal/httpapi/router.go @@ -67,7 +67,7 @@ func NewRouter(deps RouterDeps) http.Handler { mux.HandleFunc("GET /api/v1/meta", handleMeta) // ── Player status (existing) ────────────────────────────────────────── - mux.HandleFunc("POST /api/v1/player/status", handlePlayerStatus(deps.StatusStore)) + mux.HandleFunc("POST /api/v1/player/status", handlePlayerStatus(deps.StatusStore, deps.Config.MQTTBroker, deps.Config.MQTTUsername, deps.Config.MQTTPassword)) mux.HandleFunc("POST /api/v1/player/screenshot", handlePlayerScreenshot(deps.ScreenshotStore)) mux.HandleFunc("GET /api/v1/screens/status", handleListLatestPlayerStatuses(deps.StatusStore)) mux.HandleFunc("GET /api/v1/screens/{screenId}/status", handleGetLatestPlayerStatus(deps.StatusStore))