morz-infoboard/player/agent/internal/app/app.go
Jesko Anschütz 6bc4d3d2f8 Fix: Protokoll-relative URLs, PDF-Fragment-Merge, Startup-Token-Cache, Test-Nil-Deref
- URL-Normalisierung überspringt jetzt //protocol-relative URLs
- PDF-Viewer-Parameter werden mit bestehenden Fragments gemerged statt blind angehängt
- /api/startup-token setzt Cache-Control: no-store (Server + Client)
- Tote Goroutine mit ignoriertem net.Listen-Error aus TestAssetsServed entfernt

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-23 15:21:26 +01:00

420 lines
11 KiB
Go

package app
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strings"
"sync"
"time"
"git.az-it.net/az/morz-infoboard/player/agent/internal/config"
"git.az-it.net/az/morz-infoboard/player/agent/internal/mqttheartbeat"
"git.az-it.net/az/morz-infoboard/player/agent/internal/mqttsubscriber"
"git.az-it.net/az/morz-infoboard/player/agent/internal/playerserver"
"git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter"
)
type Status string
type Connectivity string
const (
StatusStarting Status = "starting"
StatusRunning Status = "running"
StatusStopped Status = "stopped"
ConnectivityUnknown Connectivity = "unknown"
ConnectivityOnline Connectivity = "online"
ConnectivityDegraded Connectivity = "degraded"
ConnectivityOffline Connectivity = "offline"
)
const offlineFailureThreshold = 3
type HealthSnapshot struct {
Status Status
ServerConnectivity Connectivity
ScreenID string
ServerBaseURL string
MQTTBroker string
HeartbeatEvery int
StartedAt time.Time
LastHeartbeatAt time.Time
}
type App struct {
Config config.Config
logger *log.Logger
now func() time.Time
reporter statusSender
mqttPub mqttSender
mu sync.RWMutex
status Status
serverConnectivity Connectivity
consecutiveReportFailures int
startedAt time.Time
lastHeartbeatAt time.Time
// Playlist fetched from the backend (protected by playlistMu).
playlistMu sync.RWMutex
playlist []playerserver.PlaylistItem
// mqttFetchC receives a signal whenever a playlist-changed MQTT message
// arrives (after debouncing in the subscriber). pollPlaylist listens on
// this channel to trigger an immediate fetchPlaylist call.
mqttFetchC chan struct{}
}
type statusSender interface {
Send(ctx context.Context, snapshot statusreporter.Snapshot) error
}
type mqttSender interface {
SendHeartbeat(status, connectivity string, ts time.Time) error
Close()
}
func New() (*App, error) {
cfg, err := config.Load()
if err != nil {
return nil, err
}
logger := log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
var mqttPub mqttSender
if cfg.MQTTBroker != "" {
mqttPub = mqttheartbeat.New(cfg.MQTTBroker, cfg.ScreenID, cfg.MQTTUsername, cfg.MQTTPassword)
logger.Printf("event=mqtt_enabled broker=%s", cfg.MQTTBroker)
} else {
logger.Printf("event=mqtt_disabled reason=no_broker_configured")
}
return newApp(cfg, logger, time.Now, statusreporter.New(cfg.ServerBaseURL, nil, time.Now), mqttPub), nil
}
func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporter statusSender, mqttPub mqttSender) *App {
if logger == nil {
logger = log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
}
if now == nil {
now = time.Now
}
return &App{
Config: cfg,
logger: logger,
now: now,
reporter: reporter,
mqttPub: mqttPub,
status: StatusStarting,
serverConnectivity: ConnectivityUnknown,
mqttFetchC: make(chan struct{}, 1),
}
}
func (a *App) Snapshot() HealthSnapshot {
a.mu.RLock()
defer a.mu.RUnlock()
return HealthSnapshot{
Status: a.status,
ServerConnectivity: a.serverConnectivity,
ScreenID: a.Config.ScreenID,
ServerBaseURL: a.Config.ServerBaseURL,
MQTTBroker: a.Config.MQTTBroker,
HeartbeatEvery: a.Config.HeartbeatEvery,
StartedAt: a.startedAt,
LastHeartbeatAt: a.lastHeartbeatAt,
}
}
func (a *App) Run(ctx context.Context) error {
if a.Config.ScreenID == "" {
return fmt.Errorf("screen id is required")
}
select {
case <-ctx.Done():
a.mu.Lock()
a.status = StatusStopped
a.mu.Unlock()
return nil
default:
}
a.mu.Lock()
a.startedAt = a.now()
a.mu.Unlock()
a.logger.Printf(
"event=agent_configured screen_id=%s screen_name=%q orientation=%s server_url=%s mqtt_broker=%s heartbeat_every_seconds=%d player_addr=%s",
a.Config.ScreenID,
a.Config.ScreenName,
a.Config.ScreenOrientation,
a.Config.ServerBaseURL,
a.Config.MQTTBroker,
a.Config.HeartbeatEvery,
a.Config.PlayerListenAddr,
)
// Start the player HTTP server (serves Chromium UI).
ps := playerserver.New(a.Config.PlayerListenAddr, func() playerserver.NowPlaying {
snap := a.Snapshot()
a.playlistMu.RLock()
items := a.playlist
a.playlistMu.RUnlock()
np := playerserver.NowPlaying{
Status: string(snap.Status),
Connectivity: string(snap.ServerConnectivity),
}
if len(items) > 0 {
np.Playlist = items
} else if a.Config.PlayerContentURL != "" {
// Fallback: single static URL when no playlist is available yet.
np.Playlist = []playerserver.PlaylistItem{{
Src: a.Config.PlayerContentURL,
Type: "web",
Title: "",
DurationSeconds: 30,
}}
}
return np
})
go func() {
if err := ps.Run(ctx); err != nil {
a.logger.Printf("event=player_server_error error=%v", err)
}
}()
// Self-register this screen in the backend (best-effort, non-blocking).
go a.registerScreen(ctx)
// Subscribe to playlist-changed MQTT notifications (optional; fallback = polling).
sub := mqttsubscriber.New(
a.Config.MQTTBroker,
a.Config.ScreenID,
a.Config.MQTTUsername,
a.Config.MQTTPassword,
func() {
// Debounced callback: send a non-blocking signal to the fetch channel.
select {
case a.mqttFetchC <- struct{}{}:
default: // already a pending signal — no need to queue another
}
a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID)
},
)
if sub != nil {
a.logger.Printf("event=mqtt_subscriber_enabled broker=%s screen_id=%s topic=%s",
a.Config.MQTTBroker, a.Config.ScreenID, mqttsubscriber.Topic(a.Config.ScreenID))
defer sub.Close()
}
// Start polling the backend for playlist updates (60 s fallback + MQTT trigger).
go a.pollPlaylist(ctx)
a.emitHeartbeat()
a.mu.Lock()
a.status = StatusRunning
a.mu.Unlock()
ticker := time.NewTicker(time.Duration(a.Config.HeartbeatEvery) * time.Second)
defer ticker.Stop()
reportTicker := time.NewTicker(time.Duration(a.Config.StatusReportEvery) * time.Second)
defer reportTicker.Stop()
a.reportStatus(ctx)
for {
select {
case <-ctx.Done():
a.mu.Lock()
a.status = StatusStopped
a.mu.Unlock()
if a.mqttPub != nil {
a.mqttPub.Close()
}
a.logger.Printf("event=agent_stopped screen_id=%s", a.Config.ScreenID)
return nil
case <-ticker.C:
a.emitHeartbeat()
case <-reportTicker.C:
a.reportStatus(ctx)
}
}
}
// registerScreen upserts this screen in the backend so it appears in the admin UI.
// Called once at startup in a goroutine — retries every 30s until successful.
func (a *App) registerScreen(ctx context.Context) {
body, _ := json.Marshal(map[string]string{
"slug": a.Config.ScreenID,
"name": a.Config.ScreenName,
"orientation": a.Config.ScreenOrientation,
})
for attempt := 1; ; attempt++ {
req, err := http.NewRequestWithContext(ctx,
http.MethodPost,
a.Config.ServerBaseURL+"/api/v1/screens/register",
bytes.NewReader(body),
)
if err != nil {
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
a.logger.Printf("event=screen_registered screen_id=%s attempt=%d", a.Config.ScreenID, attempt)
return
}
a.logger.Printf("event=screen_register_failed screen_id=%s status=%d attempt=%d",
a.Config.ScreenID, resp.StatusCode, attempt)
} else {
a.logger.Printf("event=screen_register_failed screen_id=%s error=%v attempt=%d",
a.Config.ScreenID, err, attempt)
}
select {
case <-ctx.Done():
return
case <-time.After(30 * time.Second):
}
}
}
// pollPlaylist fetches the active playlist from the backend.
// It fetches immediately on startup, then waits for either:
// - an MQTT playlist-changed notification (fast path, debounced by subscriber)
// - the 60-second fallback ticker (in case MQTT is unavailable)
func (a *App) pollPlaylist(ctx context.Context) {
// Fetch immediately on startup.
a.fetchPlaylist(ctx)
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-a.mqttFetchC:
a.logger.Printf("event=playlist_triggered_by_mqtt screen_id=%s", a.Config.ScreenID)
a.fetchPlaylist(ctx)
case <-ticker.C:
a.fetchPlaylist(ctx)
}
}
}
// remotePlaylistResponse is the JSON shape of GET /api/v1/screens/{slug}/playlist.
type remotePlaylistResponse struct {
Items []playerserver.PlaylistItem `json:"items"`
}
func (a *App) fetchPlaylist(ctx context.Context) {
url := a.Config.ServerBaseURL + "/api/v1/screens/" + a.Config.ScreenID + "/playlist"
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
a.logger.Printf("event=playlist_fetch_failed screen_id=%s error=%v", a.Config.ScreenID, err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
a.logger.Printf("event=playlist_fetch_failed screen_id=%s status=%d", a.Config.ScreenID, resp.StatusCode)
return
}
var pr remotePlaylistResponse
if err := json.NewDecoder(resp.Body).Decode(&pr); err != nil {
a.logger.Printf("event=playlist_decode_failed screen_id=%s error=%v", a.Config.ScreenID, err)
return
}
for i := range pr.Items {
src := pr.Items[i].Src
// Nur echte relative Pfade prefixen (einzelnes /), nicht protokoll-relative
// URLs (//cdn.example.com/...) und keine absoluten URLs (http://, https://).
if strings.HasPrefix(src, "/") && !strings.HasPrefix(src, "//") {
pr.Items[i].Src = a.Config.ServerBaseURL + src
}
}
a.playlistMu.Lock()
a.playlist = pr.Items
a.playlistMu.Unlock()
a.logger.Printf("event=playlist_fetched screen_id=%s items=%d", a.Config.ScreenID, len(pr.Items))
}
func (a *App) emitHeartbeat() {
now := a.now()
a.mu.Lock()
a.lastHeartbeatAt = now
status := a.status
connectivity := a.serverConnectivity
a.mu.Unlock()
if a.mqttPub != nil {
if err := a.mqttPub.SendHeartbeat(string(status), string(connectivity), now); err != nil {
a.logger.Printf("event=mqtt_heartbeat_failed screen_id=%s error=%v", a.Config.ScreenID, err)
}
}
}
func (a *App) reportStatus(ctx context.Context) {
if a.reporter == nil {
return
}
snapshot := a.Snapshot()
payloadConnectivity := snapshot.ServerConnectivity
if payloadConnectivity == ConnectivityUnknown || payloadConnectivity == ConnectivityOnline || payloadConnectivity == ConnectivityDegraded || payloadConnectivity == ConnectivityOffline {
payloadConnectivity = ConnectivityOnline
}
err := a.reporter.Send(ctx, statusreporter.Snapshot{
Status: string(snapshot.Status),
ServerConnectivity: string(payloadConnectivity),
ScreenID: snapshot.ScreenID,
ServerBaseURL: snapshot.ServerBaseURL,
MQTTBroker: snapshot.MQTTBroker,
HeartbeatEverySeconds: snapshot.HeartbeatEvery,
StartedAt: snapshot.StartedAt,
LastHeartbeatAt: snapshot.LastHeartbeatAt,
})
if err != nil {
a.mu.Lock()
a.consecutiveReportFailures++
a.serverConnectivity = ConnectivityDegraded
if a.consecutiveReportFailures >= offlineFailureThreshold {
a.serverConnectivity = ConnectivityOffline
}
a.mu.Unlock()
a.logger.Printf("event=status_report_failed screen_id=%s error=%v", a.Config.ScreenID, err)
return
}
a.mu.Lock()
a.consecutiveReportFailures = 0
a.serverConnectivity = ConnectivityOnline
a.mu.Unlock()
}