package app import ( "bytes" "context" "encoding/json" "fmt" "log" "net/http" "os" "strings" "sync" "time" "git.az-it.net/az/morz-infoboard/player/agent/internal/config" "git.az-it.net/az/morz-infoboard/player/agent/internal/mqttheartbeat" "git.az-it.net/az/morz-infoboard/player/agent/internal/mqttsubscriber" "git.az-it.net/az/morz-infoboard/player/agent/internal/playerserver" "git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter" ) type Status string type Connectivity string const ( StatusStarting Status = "starting" StatusRunning Status = "running" StatusStopped Status = "stopped" ConnectivityUnknown Connectivity = "unknown" ConnectivityOnline Connectivity = "online" ConnectivityDegraded Connectivity = "degraded" ConnectivityOffline Connectivity = "offline" ) const offlineFailureThreshold = 3 type HealthSnapshot struct { Status Status ServerConnectivity Connectivity ScreenID string ServerBaseURL string MQTTBroker string HeartbeatEvery int StartedAt time.Time LastHeartbeatAt time.Time } type App struct { Config config.Config logger *log.Logger now func() time.Time reporter statusSender mqttPub mqttSender mu sync.RWMutex status Status serverConnectivity Connectivity consecutiveReportFailures int startedAt time.Time lastHeartbeatAt time.Time // Playlist fetched from the backend (protected by playlistMu). playlistMu sync.RWMutex playlist []playerserver.PlaylistItem // mqttFetchC receives a signal whenever a playlist-changed MQTT message // arrives (after debouncing in the subscriber). pollPlaylist listens on // this channel to trigger an immediate fetchPlaylist call. mqttFetchC chan struct{} } type statusSender interface { Send(ctx context.Context, snapshot statusreporter.Snapshot) error } type mqttSender interface { SendHeartbeat(status, connectivity string, ts time.Time) error Close() } func New() (*App, error) { cfg, err := config.Load() if err != nil { return nil, err } logger := log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC) var mqttPub mqttSender if cfg.MQTTBroker != "" { mqttPub = mqttheartbeat.New(cfg.MQTTBroker, cfg.ScreenID, cfg.MQTTUsername, cfg.MQTTPassword) logger.Printf("event=mqtt_enabled broker=%s", cfg.MQTTBroker) } else { logger.Printf("event=mqtt_disabled reason=no_broker_configured") } return newApp(cfg, logger, time.Now, statusreporter.New(cfg.ServerBaseURL, nil, time.Now), mqttPub), nil } func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporter statusSender, mqttPub mqttSender) *App { if logger == nil { logger = log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC) } if now == nil { now = time.Now } return &App{ Config: cfg, logger: logger, now: now, reporter: reporter, mqttPub: mqttPub, status: StatusStarting, serverConnectivity: ConnectivityUnknown, mqttFetchC: make(chan struct{}, 1), } } func (a *App) Snapshot() HealthSnapshot { a.mu.RLock() defer a.mu.RUnlock() return HealthSnapshot{ Status: a.status, ServerConnectivity: a.serverConnectivity, ScreenID: a.Config.ScreenID, ServerBaseURL: a.Config.ServerBaseURL, MQTTBroker: a.Config.MQTTBroker, HeartbeatEvery: a.Config.HeartbeatEvery, StartedAt: a.startedAt, LastHeartbeatAt: a.lastHeartbeatAt, } } func (a *App) Run(ctx context.Context) error { if a.Config.ScreenID == "" { return fmt.Errorf("screen id is required") } select { case <-ctx.Done(): a.mu.Lock() a.status = StatusStopped a.mu.Unlock() return nil default: } a.mu.Lock() a.startedAt = a.now() a.mu.Unlock() a.logger.Printf( "event=agent_configured screen_id=%s screen_name=%q orientation=%s server_url=%s mqtt_broker=%s heartbeat_every_seconds=%d player_addr=%s", a.Config.ScreenID, a.Config.ScreenName, a.Config.ScreenOrientation, a.Config.ServerBaseURL, a.Config.MQTTBroker, a.Config.HeartbeatEvery, a.Config.PlayerListenAddr, ) // Start the player HTTP server (serves Chromium UI). ps := playerserver.New(a.Config.PlayerListenAddr, func() playerserver.NowPlaying { snap := a.Snapshot() a.playlistMu.RLock() items := a.playlist a.playlistMu.RUnlock() np := playerserver.NowPlaying{ Status: string(snap.Status), Connectivity: string(snap.ServerConnectivity), } if len(items) > 0 { np.Playlist = items } else if a.Config.PlayerContentURL != "" { // Fallback: single static URL when no playlist is available yet. np.Playlist = []playerserver.PlaylistItem{{ Src: a.Config.PlayerContentURL, Type: "web", Title: "", DurationSeconds: 30, }} } return np }) go func() { if err := ps.Run(ctx); err != nil { a.logger.Printf("event=player_server_error error=%v", err) } }() // Self-register this screen in the backend (best-effort, non-blocking). go a.registerScreen(ctx) // Subscribe to playlist-changed MQTT notifications (optional; fallback = polling). sub := mqttsubscriber.New( a.Config.MQTTBroker, a.Config.ScreenID, a.Config.MQTTUsername, a.Config.MQTTPassword, func() { // Debounced callback: send a non-blocking signal to the fetch channel. select { case a.mqttFetchC <- struct{}{}: default: // already a pending signal — no need to queue another } a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID) }, ) if sub != nil { a.logger.Printf("event=mqtt_subscriber_enabled broker=%s screen_id=%s topic=%s", a.Config.MQTTBroker, a.Config.ScreenID, mqttsubscriber.Topic(a.Config.ScreenID)) defer sub.Close() } // Start polling the backend for playlist updates (60 s fallback + MQTT trigger). go a.pollPlaylist(ctx) a.emitHeartbeat() a.mu.Lock() a.status = StatusRunning a.mu.Unlock() ticker := time.NewTicker(time.Duration(a.Config.HeartbeatEvery) * time.Second) defer ticker.Stop() reportTicker := time.NewTicker(time.Duration(a.Config.StatusReportEvery) * time.Second) defer reportTicker.Stop() a.reportStatus(ctx) for { select { case <-ctx.Done(): a.mu.Lock() a.status = StatusStopped a.mu.Unlock() if a.mqttPub != nil { a.mqttPub.Close() } a.logger.Printf("event=agent_stopped screen_id=%s", a.Config.ScreenID) return nil case <-ticker.C: a.emitHeartbeat() case <-reportTicker.C: a.reportStatus(ctx) } } } // registerScreen upserts this screen in the backend so it appears in the admin UI. // Called once at startup in a goroutine — retries every 30s until successful. func (a *App) registerScreen(ctx context.Context) { body, _ := json.Marshal(map[string]string{ "slug": a.Config.ScreenID, "name": a.Config.ScreenName, "orientation": a.Config.ScreenOrientation, }) for attempt := 1; ; attempt++ { req, err := http.NewRequestWithContext(ctx, http.MethodPost, a.Config.ServerBaseURL+"/api/v1/screens/register", bytes.NewReader(body), ) if err != nil { return } req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err == nil { resp.Body.Close() if resp.StatusCode == http.StatusOK { a.logger.Printf("event=screen_registered screen_id=%s attempt=%d", a.Config.ScreenID, attempt) return } a.logger.Printf("event=screen_register_failed screen_id=%s status=%d attempt=%d", a.Config.ScreenID, resp.StatusCode, attempt) } else { a.logger.Printf("event=screen_register_failed screen_id=%s error=%v attempt=%d", a.Config.ScreenID, err, attempt) } select { case <-ctx.Done(): return case <-time.After(30 * time.Second): } } } // pollPlaylist fetches the active playlist from the backend. // It fetches immediately on startup, then waits for either: // - an MQTT playlist-changed notification (fast path, debounced by subscriber) // - the 60-second fallback ticker (in case MQTT is unavailable) func (a *App) pollPlaylist(ctx context.Context) { // Fetch immediately on startup. a.fetchPlaylist(ctx) ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-a.mqttFetchC: a.logger.Printf("event=playlist_triggered_by_mqtt screen_id=%s", a.Config.ScreenID) a.fetchPlaylist(ctx) case <-ticker.C: a.fetchPlaylist(ctx) } } } // remotePlaylistResponse is the JSON shape of GET /api/v1/screens/{slug}/playlist. type remotePlaylistResponse struct { Items []playerserver.PlaylistItem `json:"items"` } func (a *App) fetchPlaylist(ctx context.Context) { url := a.Config.ServerBaseURL + "/api/v1/screens/" + a.Config.ScreenID + "/playlist" req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return } resp, err := http.DefaultClient.Do(req) if err != nil { a.logger.Printf("event=playlist_fetch_failed screen_id=%s error=%v", a.Config.ScreenID, err) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { a.logger.Printf("event=playlist_fetch_failed screen_id=%s status=%d", a.Config.ScreenID, resp.StatusCode) return } var pr remotePlaylistResponse if err := json.NewDecoder(resp.Body).Decode(&pr); err != nil { a.logger.Printf("event=playlist_decode_failed screen_id=%s error=%v", a.Config.ScreenID, err) return } for i := range pr.Items { if strings.HasPrefix(pr.Items[i].Src, "/") { pr.Items[i].Src = a.Config.ServerBaseURL + pr.Items[i].Src } } a.playlistMu.Lock() a.playlist = pr.Items a.playlistMu.Unlock() a.logger.Printf("event=playlist_fetched screen_id=%s items=%d", a.Config.ScreenID, len(pr.Items)) } func (a *App) emitHeartbeat() { now := a.now() a.mu.Lock() a.lastHeartbeatAt = now status := a.status connectivity := a.serverConnectivity a.mu.Unlock() if a.mqttPub != nil { if err := a.mqttPub.SendHeartbeat(string(status), string(connectivity), now); err != nil { a.logger.Printf("event=mqtt_heartbeat_failed screen_id=%s error=%v", a.Config.ScreenID, err) } } } func (a *App) reportStatus(ctx context.Context) { if a.reporter == nil { return } snapshot := a.Snapshot() payloadConnectivity := snapshot.ServerConnectivity if payloadConnectivity == ConnectivityUnknown || payloadConnectivity == ConnectivityOnline || payloadConnectivity == ConnectivityDegraded || payloadConnectivity == ConnectivityOffline { payloadConnectivity = ConnectivityOnline } err := a.reporter.Send(ctx, statusreporter.Snapshot{ Status: string(snapshot.Status), ServerConnectivity: string(payloadConnectivity), ScreenID: snapshot.ScreenID, ServerBaseURL: snapshot.ServerBaseURL, MQTTBroker: snapshot.MQTTBroker, HeartbeatEverySeconds: snapshot.HeartbeatEvery, StartedAt: snapshot.StartedAt, LastHeartbeatAt: snapshot.LastHeartbeatAt, }) if err != nil { a.mu.Lock() a.consecutiveReportFailures++ a.serverConnectivity = ConnectivityDegraded if a.consecutiveReportFailures >= offlineFailureThreshold { a.serverConnectivity = ConnectivityOffline } a.mu.Unlock() a.logger.Printf("event=status_report_failed screen_id=%s error=%v", a.Config.ScreenID, err) return } a.mu.Lock() a.consecutiveReportFailures = 0 a.serverConnectivity = ConnectivityOnline a.mu.Unlock() }