package app import ( "bytes" "context" "encoding/json" "fmt" "log" "net/http" "os" "strings" "sync" "time" "git.az-it.net/az/morz-infoboard/player/agent/internal/config" "git.az-it.net/az/morz-infoboard/player/agent/internal/displaycontroller" "git.az-it.net/az/morz-infoboard/player/agent/internal/mqttheartbeat" "git.az-it.net/az/morz-infoboard/player/agent/internal/mqttsubscriber" "git.az-it.net/az/morz-infoboard/player/agent/internal/playerserver" "git.az-it.net/az/morz-infoboard/player/agent/internal/screenshot" "git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter" ) type Status string type Connectivity string const ( StatusStarting Status = "starting" StatusRunning Status = "running" StatusStopped Status = "stopped" ConnectivityUnknown Connectivity = "unknown" ConnectivityOnline Connectivity = "online" ConnectivityDegraded Connectivity = "degraded" ConnectivityOffline Connectivity = "offline" ) const offlineFailureThreshold = 3 type HealthSnapshot struct { Status Status ServerConnectivity Connectivity ScreenID string ServerBaseURL string MQTTBroker string HeartbeatEvery int StartedAt time.Time LastHeartbeatAt time.Time } type App struct { Config config.Config logger *log.Logger now func() time.Time reporter statusSender mqttPub mqttSender mu sync.RWMutex status Status serverConnectivity Connectivity consecutiveReportFailures int startedAt time.Time lastHeartbeatAt time.Time // mqttMu guards mqttPub and mqttSub as well as the MQTT fields in Config // (Config.MQTTBroker, Config.MQTTUsername, Config.MQTTPassword). mqttMu sync.Mutex mqttSub mqttCloser // Playlist fetched from the backend (protected by playlistMu). playlistMu sync.RWMutex playlist []playerserver.PlaylistItem // mqttFetchC receives a signal whenever a playlist-changed MQTT message // arrives (after debouncing in the subscriber). pollPlaylist listens on // this channel to trigger an immediate fetchPlaylist call. mqttFetchC chan struct{} // screenshotFn is kept so that applyMQTTConfig can pass it to the new subscriber. screenshotFn func() displayCtrl *displaycontroller.Controller } // mqttCloser is implemented by mqttsubscriber.Subscriber. type mqttCloser interface { Close() } type statusSender interface { Send(ctx context.Context, snapshot statusreporter.Snapshot) (statusreporter.MQTTConfig, error) } type mqttSender interface { SendHeartbeat(status, connectivity string, ts time.Time) error SendDisplayState(screenSlug, state string) error Close() } func New() (*App, error) { cfg, err := config.Load() if err != nil { return nil, err } logger := log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC) var mqttPub mqttSender if cfg.MQTTBroker != "" { mqttPub = mqttheartbeat.New(cfg.MQTTBroker, cfg.ScreenID, cfg.MQTTUsername, cfg.MQTTPassword) logger.Printf("event=mqtt_enabled broker=%s", cfg.MQTTBroker) } else { logger.Printf("event=mqtt_disabled reason=no_broker_configured") } return newApp(cfg, logger, time.Now, statusreporter.New(cfg.ServerBaseURL, nil, time.Now), mqttPub), nil } func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporter statusSender, mqttPub mqttSender) *App { if logger == nil { logger = log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC) } if now == nil { now = time.Now } return &App{ Config: cfg, logger: logger, now: now, reporter: reporter, mqttPub: mqttPub, status: StatusStarting, serverConnectivity: ConnectivityUnknown, mqttFetchC: make(chan struct{}, 1), } } func (a *App) Snapshot() HealthSnapshot { a.mu.RLock() defer a.mu.RUnlock() return HealthSnapshot{ Status: a.status, ServerConnectivity: a.serverConnectivity, ScreenID: a.Config.ScreenID, ServerBaseURL: a.Config.ServerBaseURL, MQTTBroker: a.Config.MQTTBroker, HeartbeatEvery: a.Config.HeartbeatEvery, StartedAt: a.startedAt, LastHeartbeatAt: a.lastHeartbeatAt, } } func (a *App) Run(ctx context.Context) error { if a.Config.ScreenID == "" { return fmt.Errorf("screen id is required") } select { case <-ctx.Done(): a.mu.Lock() a.status = StatusStopped a.mu.Unlock() return nil default: } a.mu.Lock() a.startedAt = a.now() a.mu.Unlock() a.logger.Printf( "event=agent_configured screen_id=%s screen_name=%q orientation=%s server_url=%s mqtt_broker=%s heartbeat_every_seconds=%d player_addr=%s", a.Config.ScreenID, a.Config.ScreenName, a.Config.ScreenOrientation, a.Config.ServerBaseURL, a.Config.MQTTBroker, a.Config.HeartbeatEvery, a.Config.PlayerListenAddr, ) // Start the player HTTP server (serves Chromium UI). ps := playerserver.New(a.Config.PlayerListenAddr, func() playerserver.NowPlaying { snap := a.Snapshot() a.playlistMu.RLock() items := a.playlist a.playlistMu.RUnlock() np := playerserver.NowPlaying{ Status: string(snap.Status), Connectivity: string(snap.ServerConnectivity), } if len(items) > 0 { np.Playlist = items } else if a.Config.PlayerContentURL != "" { // Fallback: single static URL when no playlist is available yet. np.Playlist = []playerserver.PlaylistItem{{ Src: a.Config.PlayerContentURL, Type: "web", Title: "", DurationSeconds: 30, }} } return np }) go func() { if err := ps.Run(ctx); err != nil { a.logger.Printf("event=player_server_error error=%v", err) } }() // Self-register this screen in the backend (best-effort, non-blocking). go a.registerScreen(ctx) // Screenshot-Instanz immer anlegen (für periodische und On-Demand-Screenshots). ss := screenshot.New(a.Config.ScreenID, a.Config.ServerBaseURL, a.Config.ScreenshotEvery, a.logger) // Keep screenshot callback so applyMQTTConfig can hand it to new subscribers. a.screenshotFn = func() { a.logger.Printf("event=mqtt_screenshot_request screen_id=%s", a.Config.ScreenID) go ss.TakeAndSendOnce(ctx) } xDisplay := os.Getenv("DISPLAY") if xDisplay == "" { xDisplay = ":0" } a.displayCtrl = displaycontroller.New(xDisplay, a.Config.ScreenID, func(slug, state string) { a.mqttMu.Lock() pub := a.mqttPub a.mqttMu.Unlock() if pub != nil { if err := pub.SendDisplayState(slug, state); err != nil { a.logger.Printf("event=display_state_publish_error err=%v", err) } } }) // Subscribe to playlist-changed and screenshot-request MQTT notifications (optional; fallback = polling). sub := mqttsubscriber.New( a.Config.MQTTBroker, a.Config.ScreenID, a.Config.MQTTUsername, a.Config.MQTTPassword, func() { // Debounced callback: send a non-blocking signal to the fetch channel. select { case a.mqttFetchC <- struct{}{}: default: // already a pending signal — no need to queue another } a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID) }, a.screenshotFn, func(action string) { a.logger.Printf("event=display_command_received action=%s screen_id=%s", action, a.Config.ScreenID) a.displayCtrl.Execute(action) }, ) a.mqttMu.Lock() a.mqttSub = sub a.mqttMu.Unlock() if sub != nil { a.logger.Printf("event=mqtt_subscriber_enabled broker=%s screen_id=%s topic=%s screenshot_topic=%s", a.Config.MQTTBroker, a.Config.ScreenID, mqttsubscriber.Topic(a.Config.ScreenID), mqttsubscriber.ScreenshotRequestTopic(a.Config.ScreenID)) } // Start polling the backend for playlist updates (60 s fallback + MQTT trigger). go a.pollPlaylist(ctx) // Phase 6: Periodische Screenshot-Erzeugung, wenn konfiguriert. if a.Config.ScreenshotEvery > 0 { go ss.Run(ctx) a.logger.Printf("event=screenshot_enabled screen_id=%s interval_seconds=%d", a.Config.ScreenID, a.Config.ScreenshotEvery) } a.emitHeartbeat() a.mu.Lock() a.status = StatusRunning a.mu.Unlock() ticker := time.NewTicker(time.Duration(a.Config.HeartbeatEvery) * time.Second) defer ticker.Stop() reportTicker := time.NewTicker(time.Duration(a.Config.StatusReportEvery) * time.Second) defer reportTicker.Stop() a.reportStatus(ctx) for { select { case <-ctx.Done(): a.mu.Lock() a.status = StatusStopped a.mu.Unlock() a.mqttMu.Lock() if a.mqttPub != nil { a.mqttPub.Close() } if a.mqttSub != nil { a.mqttSub.Close() } a.mqttMu.Unlock() a.logger.Printf("event=agent_stopped screen_id=%s", a.Config.ScreenID) return nil case <-ticker.C: a.emitHeartbeat() case <-reportTicker.C: a.reportStatus(ctx) } } } // registerScreen upserts this screen in the backend so it appears in the admin UI. // Called once at startup in a goroutine — retries every 30s until successful. func (a *App) registerScreen(ctx context.Context) { body, _ := json.Marshal(map[string]string{ "slug": a.Config.ScreenID, "name": a.Config.ScreenName, "orientation": a.Config.ScreenOrientation, }) for attempt := 1; ; attempt++ { req, err := http.NewRequestWithContext(ctx, http.MethodPost, a.Config.ServerBaseURL+"/api/v1/screens/register", bytes.NewReader(body), ) if err != nil { return } req.Header.Set("Content-Type", "application/json") // K6: Register-Secret mitsenden, wenn konfiguriert. if a.Config.RegisterSecret != "" { req.Header.Set("X-Register-Secret", a.Config.RegisterSecret) } resp, err := http.DefaultClient.Do(req) if err == nil { resp.Body.Close() if resp.StatusCode == http.StatusOK { a.logger.Printf("event=screen_registered screen_id=%s attempt=%d", a.Config.ScreenID, attempt) return } a.logger.Printf("event=screen_register_failed screen_id=%s status=%d attempt=%d", a.Config.ScreenID, resp.StatusCode, attempt) } else { a.logger.Printf("event=screen_register_failed screen_id=%s error=%v attempt=%d", a.Config.ScreenID, err, attempt) } select { case <-ctx.Done(): return case <-time.After(30 * time.Second): } } } // pollPlaylist fetches the active playlist from the backend. // It fetches immediately on startup, then waits for either: // - an MQTT playlist-changed notification (fast path, debounced by subscriber) // - the 60-second fallback ticker (in case MQTT is unavailable) func (a *App) pollPlaylist(ctx context.Context) { // Fetch immediately on startup. a.fetchPlaylist(ctx) ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-a.mqttFetchC: a.logger.Printf("event=playlist_triggered_by_mqtt screen_id=%s", a.Config.ScreenID) a.fetchPlaylist(ctx) case <-ticker.C: a.fetchPlaylist(ctx) } } } // remotePlaylistResponse is the JSON shape of GET /api/v1/screens/{slug}/playlist. type remotePlaylistResponse struct { Items []playerserver.PlaylistItem `json:"items"` } func (a *App) fetchPlaylist(ctx context.Context) { url := a.Config.ServerBaseURL + "/api/v1/screens/" + a.Config.ScreenID + "/playlist" req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return } resp, err := http.DefaultClient.Do(req) if err != nil { a.logger.Printf("event=playlist_fetch_failed screen_id=%s error=%v", a.Config.ScreenID, err) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { a.logger.Printf("event=playlist_fetch_failed screen_id=%s status=%d", a.Config.ScreenID, resp.StatusCode) return } var pr remotePlaylistResponse if err := json.NewDecoder(resp.Body).Decode(&pr); err != nil { a.logger.Printf("event=playlist_decode_failed screen_id=%s error=%v", a.Config.ScreenID, err) return } for i := range pr.Items { src := pr.Items[i].Src // Nur echte relative Pfade prefixen (einzelnes /), nicht protokoll-relative // URLs (//cdn.example.com/...) und keine absoluten URLs (http://, https://). if strings.HasPrefix(src, "/") && !strings.HasPrefix(src, "//") { pr.Items[i].Src = a.Config.ServerBaseURL + src } } a.playlistMu.Lock() a.playlist = pr.Items a.playlistMu.Unlock() a.logger.Printf("event=playlist_fetched screen_id=%s items=%d", a.Config.ScreenID, len(pr.Items)) } func (a *App) emitHeartbeat() { now := a.now() a.mu.Lock() a.lastHeartbeatAt = now status := a.status connectivity := a.serverConnectivity a.mu.Unlock() if a.mqttPub != nil { if err := a.mqttPub.SendHeartbeat(string(status), string(connectivity), now); err != nil { a.logger.Printf("event=mqtt_heartbeat_failed screen_id=%s error=%v", a.Config.ScreenID, err) } } } func (a *App) reportStatus(ctx context.Context) { if a.reporter == nil { return } snapshot := a.Snapshot() payloadConnectivity := snapshot.ServerConnectivity if payloadConnectivity == ConnectivityUnknown || payloadConnectivity == ConnectivityOnline || payloadConnectivity == ConnectivityDegraded || payloadConnectivity == ConnectivityOffline { payloadConnectivity = ConnectivityOnline } var displayState string if a.displayCtrl != nil { displayState = a.displayCtrl.State() } mqttCfg, err := a.reporter.Send(ctx, statusreporter.Snapshot{ Status: string(snapshot.Status), ServerConnectivity: string(payloadConnectivity), ScreenID: snapshot.ScreenID, ServerBaseURL: snapshot.ServerBaseURL, MQTTBroker: snapshot.MQTTBroker, HeartbeatEverySeconds: snapshot.HeartbeatEvery, StartedAt: snapshot.StartedAt, LastHeartbeatAt: snapshot.LastHeartbeatAt, DisplayState: displayState, }) if err != nil { a.mu.Lock() a.consecutiveReportFailures++ a.serverConnectivity = ConnectivityDegraded if a.consecutiveReportFailures >= offlineFailureThreshold { a.serverConnectivity = ConnectivityOffline } a.mu.Unlock() a.logger.Printf("event=status_report_failed screen_id=%s error=%v", a.Config.ScreenID, err) return } a.mu.Lock() a.consecutiveReportFailures = 0 a.serverConnectivity = ConnectivityOnline a.mu.Unlock() // Apply MQTT config from server response if broker was provided. if mqttCfg.Broker != "" { a.applyMQTTConfig(mqttCfg.Broker, mqttCfg.Username, mqttCfg.Password) } } // applyMQTTConfig checks whether the MQTT configuration has changed and, if so, // gracefully stops the existing MQTT clients and starts new ones. // It is safe for concurrent use — all MQTT client mutations are protected by mqttMu. func (a *App) applyMQTTConfig(broker, username, password string) { a.mqttMu.Lock() defer a.mqttMu.Unlock() // Nothing to do when the config is unchanged. if a.Config.MQTTBroker == broker && a.Config.MQTTUsername == username && a.Config.MQTTPassword == password { return } a.logger.Printf("event=mqtt_config_changed screen_id=%s old_broker=%s new_broker=%s", a.Config.ScreenID, a.Config.MQTTBroker, broker) // Stop existing clients. if a.mqttPub != nil { a.mqttPub.Close() a.mqttPub = nil } if a.mqttSub != nil { a.mqttSub.Close() a.mqttSub = nil } // Update stored config. a.Config.MQTTBroker = broker a.Config.MQTTUsername = username a.Config.MQTTPassword = password // Start new clients. a.mqttPub = mqttheartbeat.New(broker, a.Config.ScreenID, username, password) a.logger.Printf("event=mqtt_heartbeat_restarted screen_id=%s broker=%s", a.Config.ScreenID, broker) playlistChangedFn := func() { select { case a.mqttFetchC <- struct{}{}: default: } a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID) } sub := mqttsubscriber.New(broker, a.Config.ScreenID, username, password, playlistChangedFn, a.screenshotFn, func(action string) { a.logger.Printf("event=display_command_received action=%s screen_id=%s", action, a.Config.ScreenID) a.displayCtrl.Execute(action) }, ) a.mqttSub = sub if sub != nil { a.logger.Printf("event=mqtt_subscriber_restarted screen_id=%s broker=%s", a.Config.ScreenID, broker) } }