Server gibt bei POST /api/v1/player/status jetzt mqtt-Block zurück (broker, username, password) wenn MORZ_INFOBOARD_MQTT_BROKER gesetzt ist. Agents parsen die Response und verbinden sich bei Config-Änderung automatisch neu (applyMQTTConfig mit Reconnect-Logik, thread-safe via Mutex). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
518 lines
15 KiB
Go
518 lines
15 KiB
Go
package app
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/config"
|
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/mqttheartbeat"
|
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/mqttsubscriber"
|
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/playerserver"
|
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/screenshot"
|
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter"
|
|
)
|
|
|
|
type Status string
|
|
|
|
type Connectivity string
|
|
|
|
const (
|
|
StatusStarting Status = "starting"
|
|
StatusRunning Status = "running"
|
|
StatusStopped Status = "stopped"
|
|
|
|
ConnectivityUnknown Connectivity = "unknown"
|
|
ConnectivityOnline Connectivity = "online"
|
|
ConnectivityDegraded Connectivity = "degraded"
|
|
ConnectivityOffline Connectivity = "offline"
|
|
)
|
|
|
|
const offlineFailureThreshold = 3
|
|
|
|
type HealthSnapshot struct {
|
|
Status Status
|
|
ServerConnectivity Connectivity
|
|
ScreenID string
|
|
ServerBaseURL string
|
|
MQTTBroker string
|
|
HeartbeatEvery int
|
|
StartedAt time.Time
|
|
LastHeartbeatAt time.Time
|
|
}
|
|
|
|
type App struct {
|
|
Config config.Config
|
|
logger *log.Logger
|
|
now func() time.Time
|
|
reporter statusSender
|
|
mqttPub mqttSender
|
|
|
|
mu sync.RWMutex
|
|
status Status
|
|
serverConnectivity Connectivity
|
|
consecutiveReportFailures int
|
|
startedAt time.Time
|
|
lastHeartbeatAt time.Time
|
|
|
|
// mqttMu guards mqttPub and mqttSub as well as the MQTT fields in Config
|
|
// (Config.MQTTBroker, Config.MQTTUsername, Config.MQTTPassword).
|
|
mqttMu sync.Mutex
|
|
mqttSub mqttCloser
|
|
|
|
// Playlist fetched from the backend (protected by playlistMu).
|
|
playlistMu sync.RWMutex
|
|
playlist []playerserver.PlaylistItem
|
|
|
|
// mqttFetchC receives a signal whenever a playlist-changed MQTT message
|
|
// arrives (after debouncing in the subscriber). pollPlaylist listens on
|
|
// this channel to trigger an immediate fetchPlaylist call.
|
|
mqttFetchC chan struct{}
|
|
|
|
// screenshotFn is kept so that applyMQTTConfig can pass it to the new subscriber.
|
|
screenshotFn func()
|
|
}
|
|
|
|
// mqttCloser is implemented by mqttsubscriber.Subscriber.
|
|
type mqttCloser interface {
|
|
Close()
|
|
}
|
|
|
|
type statusSender interface {
|
|
Send(ctx context.Context, snapshot statusreporter.Snapshot) (statusreporter.MQTTConfig, error)
|
|
}
|
|
|
|
type mqttSender interface {
|
|
SendHeartbeat(status, connectivity string, ts time.Time) error
|
|
Close()
|
|
}
|
|
|
|
func New() (*App, error) {
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
logger := log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
|
|
|
|
var mqttPub mqttSender
|
|
if cfg.MQTTBroker != "" {
|
|
mqttPub = mqttheartbeat.New(cfg.MQTTBroker, cfg.ScreenID, cfg.MQTTUsername, cfg.MQTTPassword)
|
|
logger.Printf("event=mqtt_enabled broker=%s", cfg.MQTTBroker)
|
|
} else {
|
|
logger.Printf("event=mqtt_disabled reason=no_broker_configured")
|
|
}
|
|
|
|
return newApp(cfg, logger, time.Now, statusreporter.New(cfg.ServerBaseURL, nil, time.Now), mqttPub), nil
|
|
}
|
|
|
|
func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporter statusSender, mqttPub mqttSender) *App {
|
|
if logger == nil {
|
|
logger = log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
|
|
}
|
|
|
|
if now == nil {
|
|
now = time.Now
|
|
}
|
|
|
|
return &App{
|
|
Config: cfg,
|
|
logger: logger,
|
|
now: now,
|
|
reporter: reporter,
|
|
mqttPub: mqttPub,
|
|
status: StatusStarting,
|
|
serverConnectivity: ConnectivityUnknown,
|
|
mqttFetchC: make(chan struct{}, 1),
|
|
}
|
|
}
|
|
|
|
func (a *App) Snapshot() HealthSnapshot {
|
|
a.mu.RLock()
|
|
defer a.mu.RUnlock()
|
|
|
|
return HealthSnapshot{
|
|
Status: a.status,
|
|
ServerConnectivity: a.serverConnectivity,
|
|
ScreenID: a.Config.ScreenID,
|
|
ServerBaseURL: a.Config.ServerBaseURL,
|
|
MQTTBroker: a.Config.MQTTBroker,
|
|
HeartbeatEvery: a.Config.HeartbeatEvery,
|
|
StartedAt: a.startedAt,
|
|
LastHeartbeatAt: a.lastHeartbeatAt,
|
|
}
|
|
}
|
|
|
|
func (a *App) Run(ctx context.Context) error {
|
|
if a.Config.ScreenID == "" {
|
|
return fmt.Errorf("screen id is required")
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
a.mu.Lock()
|
|
a.status = StatusStopped
|
|
a.mu.Unlock()
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
a.mu.Lock()
|
|
a.startedAt = a.now()
|
|
a.mu.Unlock()
|
|
|
|
a.logger.Printf(
|
|
"event=agent_configured screen_id=%s screen_name=%q orientation=%s server_url=%s mqtt_broker=%s heartbeat_every_seconds=%d player_addr=%s",
|
|
a.Config.ScreenID,
|
|
a.Config.ScreenName,
|
|
a.Config.ScreenOrientation,
|
|
a.Config.ServerBaseURL,
|
|
a.Config.MQTTBroker,
|
|
a.Config.HeartbeatEvery,
|
|
a.Config.PlayerListenAddr,
|
|
)
|
|
|
|
// Start the player HTTP server (serves Chromium UI).
|
|
ps := playerserver.New(a.Config.PlayerListenAddr, func() playerserver.NowPlaying {
|
|
snap := a.Snapshot()
|
|
a.playlistMu.RLock()
|
|
items := a.playlist
|
|
a.playlistMu.RUnlock()
|
|
|
|
np := playerserver.NowPlaying{
|
|
Status: string(snap.Status),
|
|
Connectivity: string(snap.ServerConnectivity),
|
|
}
|
|
if len(items) > 0 {
|
|
np.Playlist = items
|
|
} else if a.Config.PlayerContentURL != "" {
|
|
// Fallback: single static URL when no playlist is available yet.
|
|
np.Playlist = []playerserver.PlaylistItem{{
|
|
Src: a.Config.PlayerContentURL,
|
|
Type: "web",
|
|
Title: "",
|
|
DurationSeconds: 30,
|
|
}}
|
|
}
|
|
return np
|
|
})
|
|
go func() {
|
|
if err := ps.Run(ctx); err != nil {
|
|
a.logger.Printf("event=player_server_error error=%v", err)
|
|
}
|
|
}()
|
|
|
|
// Self-register this screen in the backend (best-effort, non-blocking).
|
|
go a.registerScreen(ctx)
|
|
|
|
// Screenshot-Instanz immer anlegen (für periodische und On-Demand-Screenshots).
|
|
ss := screenshot.New(a.Config.ScreenID, a.Config.ServerBaseURL, a.Config.ScreenshotEvery, a.logger)
|
|
|
|
// Keep screenshot callback so applyMQTTConfig can hand it to new subscribers.
|
|
a.screenshotFn = func() {
|
|
a.logger.Printf("event=mqtt_screenshot_request screen_id=%s", a.Config.ScreenID)
|
|
go ss.TakeAndSendOnce(ctx)
|
|
}
|
|
|
|
// Subscribe to playlist-changed and screenshot-request MQTT notifications (optional; fallback = polling).
|
|
sub := mqttsubscriber.New(
|
|
a.Config.MQTTBroker,
|
|
a.Config.ScreenID,
|
|
a.Config.MQTTUsername,
|
|
a.Config.MQTTPassword,
|
|
func() {
|
|
// Debounced callback: send a non-blocking signal to the fetch channel.
|
|
select {
|
|
case a.mqttFetchC <- struct{}{}:
|
|
default: // already a pending signal — no need to queue another
|
|
}
|
|
a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID)
|
|
},
|
|
a.screenshotFn,
|
|
)
|
|
a.mqttMu.Lock()
|
|
a.mqttSub = sub
|
|
a.mqttMu.Unlock()
|
|
if sub != nil {
|
|
a.logger.Printf("event=mqtt_subscriber_enabled broker=%s screen_id=%s topic=%s screenshot_topic=%s",
|
|
a.Config.MQTTBroker, a.Config.ScreenID, mqttsubscriber.Topic(a.Config.ScreenID),
|
|
mqttsubscriber.ScreenshotRequestTopic(a.Config.ScreenID))
|
|
}
|
|
|
|
// Start polling the backend for playlist updates (60 s fallback + MQTT trigger).
|
|
go a.pollPlaylist(ctx)
|
|
|
|
// Phase 6: Periodische Screenshot-Erzeugung, wenn konfiguriert.
|
|
if a.Config.ScreenshotEvery > 0 {
|
|
go ss.Run(ctx)
|
|
a.logger.Printf("event=screenshot_enabled screen_id=%s interval_seconds=%d",
|
|
a.Config.ScreenID, a.Config.ScreenshotEvery)
|
|
}
|
|
|
|
a.emitHeartbeat()
|
|
a.mu.Lock()
|
|
a.status = StatusRunning
|
|
a.mu.Unlock()
|
|
|
|
ticker := time.NewTicker(time.Duration(a.Config.HeartbeatEvery) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
reportTicker := time.NewTicker(time.Duration(a.Config.StatusReportEvery) * time.Second)
|
|
defer reportTicker.Stop()
|
|
a.reportStatus(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
a.mu.Lock()
|
|
a.status = StatusStopped
|
|
a.mu.Unlock()
|
|
a.mqttMu.Lock()
|
|
if a.mqttPub != nil {
|
|
a.mqttPub.Close()
|
|
}
|
|
if a.mqttSub != nil {
|
|
a.mqttSub.Close()
|
|
}
|
|
a.mqttMu.Unlock()
|
|
a.logger.Printf("event=agent_stopped screen_id=%s", a.Config.ScreenID)
|
|
return nil
|
|
case <-ticker.C:
|
|
a.emitHeartbeat()
|
|
case <-reportTicker.C:
|
|
a.reportStatus(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// registerScreen upserts this screen in the backend so it appears in the admin UI.
|
|
// Called once at startup in a goroutine — retries every 30s until successful.
|
|
func (a *App) registerScreen(ctx context.Context) {
|
|
body, _ := json.Marshal(map[string]string{
|
|
"slug": a.Config.ScreenID,
|
|
"name": a.Config.ScreenName,
|
|
"orientation": a.Config.ScreenOrientation,
|
|
})
|
|
|
|
for attempt := 1; ; attempt++ {
|
|
req, err := http.NewRequestWithContext(ctx,
|
|
http.MethodPost,
|
|
a.Config.ServerBaseURL+"/api/v1/screens/register",
|
|
bytes.NewReader(body),
|
|
)
|
|
if err != nil {
|
|
return
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
// K6: Register-Secret mitsenden, wenn konfiguriert.
|
|
if a.Config.RegisterSecret != "" {
|
|
req.Header.Set("X-Register-Secret", a.Config.RegisterSecret)
|
|
}
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err == nil {
|
|
resp.Body.Close()
|
|
if resp.StatusCode == http.StatusOK {
|
|
a.logger.Printf("event=screen_registered screen_id=%s attempt=%d", a.Config.ScreenID, attempt)
|
|
return
|
|
}
|
|
a.logger.Printf("event=screen_register_failed screen_id=%s status=%d attempt=%d",
|
|
a.Config.ScreenID, resp.StatusCode, attempt)
|
|
} else {
|
|
a.logger.Printf("event=screen_register_failed screen_id=%s error=%v attempt=%d",
|
|
a.Config.ScreenID, err, attempt)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(30 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
// pollPlaylist fetches the active playlist from the backend.
|
|
// It fetches immediately on startup, then waits for either:
|
|
// - an MQTT playlist-changed notification (fast path, debounced by subscriber)
|
|
// - the 60-second fallback ticker (in case MQTT is unavailable)
|
|
func (a *App) pollPlaylist(ctx context.Context) {
|
|
// Fetch immediately on startup.
|
|
a.fetchPlaylist(ctx)
|
|
|
|
ticker := time.NewTicker(60 * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-a.mqttFetchC:
|
|
a.logger.Printf("event=playlist_triggered_by_mqtt screen_id=%s", a.Config.ScreenID)
|
|
a.fetchPlaylist(ctx)
|
|
case <-ticker.C:
|
|
a.fetchPlaylist(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// remotePlaylistResponse is the JSON shape of GET /api/v1/screens/{slug}/playlist.
|
|
type remotePlaylistResponse struct {
|
|
Items []playerserver.PlaylistItem `json:"items"`
|
|
}
|
|
|
|
func (a *App) fetchPlaylist(ctx context.Context) {
|
|
url := a.Config.ServerBaseURL + "/api/v1/screens/" + a.Config.ScreenID + "/playlist"
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
a.logger.Printf("event=playlist_fetch_failed screen_id=%s error=%v", a.Config.ScreenID, err)
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
a.logger.Printf("event=playlist_fetch_failed screen_id=%s status=%d", a.Config.ScreenID, resp.StatusCode)
|
|
return
|
|
}
|
|
|
|
var pr remotePlaylistResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&pr); err != nil {
|
|
a.logger.Printf("event=playlist_decode_failed screen_id=%s error=%v", a.Config.ScreenID, err)
|
|
return
|
|
}
|
|
|
|
for i := range pr.Items {
|
|
src := pr.Items[i].Src
|
|
// Nur echte relative Pfade prefixen (einzelnes /), nicht protokoll-relative
|
|
// URLs (//cdn.example.com/...) und keine absoluten URLs (http://, https://).
|
|
if strings.HasPrefix(src, "/") && !strings.HasPrefix(src, "//") {
|
|
pr.Items[i].Src = a.Config.ServerBaseURL + src
|
|
}
|
|
}
|
|
|
|
a.playlistMu.Lock()
|
|
a.playlist = pr.Items
|
|
a.playlistMu.Unlock()
|
|
|
|
a.logger.Printf("event=playlist_fetched screen_id=%s items=%d", a.Config.ScreenID, len(pr.Items))
|
|
}
|
|
|
|
func (a *App) emitHeartbeat() {
|
|
now := a.now()
|
|
|
|
a.mu.Lock()
|
|
a.lastHeartbeatAt = now
|
|
status := a.status
|
|
connectivity := a.serverConnectivity
|
|
a.mu.Unlock()
|
|
|
|
if a.mqttPub != nil {
|
|
if err := a.mqttPub.SendHeartbeat(string(status), string(connectivity), now); err != nil {
|
|
a.logger.Printf("event=mqtt_heartbeat_failed screen_id=%s error=%v", a.Config.ScreenID, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *App) reportStatus(ctx context.Context) {
|
|
if a.reporter == nil {
|
|
return
|
|
}
|
|
|
|
snapshot := a.Snapshot()
|
|
payloadConnectivity := snapshot.ServerConnectivity
|
|
if payloadConnectivity == ConnectivityUnknown || payloadConnectivity == ConnectivityOnline || payloadConnectivity == ConnectivityDegraded || payloadConnectivity == ConnectivityOffline {
|
|
payloadConnectivity = ConnectivityOnline
|
|
}
|
|
|
|
mqttCfg, err := a.reporter.Send(ctx, statusreporter.Snapshot{
|
|
Status: string(snapshot.Status),
|
|
ServerConnectivity: string(payloadConnectivity),
|
|
ScreenID: snapshot.ScreenID,
|
|
ServerBaseURL: snapshot.ServerBaseURL,
|
|
MQTTBroker: snapshot.MQTTBroker,
|
|
HeartbeatEverySeconds: snapshot.HeartbeatEvery,
|
|
StartedAt: snapshot.StartedAt,
|
|
LastHeartbeatAt: snapshot.LastHeartbeatAt,
|
|
})
|
|
if err != nil {
|
|
a.mu.Lock()
|
|
a.consecutiveReportFailures++
|
|
a.serverConnectivity = ConnectivityDegraded
|
|
if a.consecutiveReportFailures >= offlineFailureThreshold {
|
|
a.serverConnectivity = ConnectivityOffline
|
|
}
|
|
a.mu.Unlock()
|
|
a.logger.Printf("event=status_report_failed screen_id=%s error=%v", a.Config.ScreenID, err)
|
|
return
|
|
}
|
|
|
|
a.mu.Lock()
|
|
a.consecutiveReportFailures = 0
|
|
a.serverConnectivity = ConnectivityOnline
|
|
a.mu.Unlock()
|
|
|
|
// Apply MQTT config from server response if broker was provided.
|
|
if mqttCfg.Broker != "" {
|
|
a.applyMQTTConfig(mqttCfg.Broker, mqttCfg.Username, mqttCfg.Password)
|
|
}
|
|
}
|
|
|
|
// applyMQTTConfig checks whether the MQTT configuration has changed and, if so,
|
|
// gracefully stops the existing MQTT clients and starts new ones.
|
|
// It is safe for concurrent use — all MQTT client mutations are protected by mqttMu.
|
|
func (a *App) applyMQTTConfig(broker, username, password string) {
|
|
a.mqttMu.Lock()
|
|
defer a.mqttMu.Unlock()
|
|
|
|
// Nothing to do when the config is unchanged.
|
|
if a.Config.MQTTBroker == broker &&
|
|
a.Config.MQTTUsername == username &&
|
|
a.Config.MQTTPassword == password {
|
|
return
|
|
}
|
|
|
|
a.logger.Printf("event=mqtt_config_changed screen_id=%s old_broker=%s new_broker=%s",
|
|
a.Config.ScreenID, a.Config.MQTTBroker, broker)
|
|
|
|
// Stop existing clients.
|
|
if a.mqttPub != nil {
|
|
a.mqttPub.Close()
|
|
a.mqttPub = nil
|
|
}
|
|
if a.mqttSub != nil {
|
|
a.mqttSub.Close()
|
|
a.mqttSub = nil
|
|
}
|
|
|
|
// Update stored config.
|
|
a.Config.MQTTBroker = broker
|
|
a.Config.MQTTUsername = username
|
|
a.Config.MQTTPassword = password
|
|
|
|
// Start new clients.
|
|
a.mqttPub = mqttheartbeat.New(broker, a.Config.ScreenID, username, password)
|
|
a.logger.Printf("event=mqtt_heartbeat_restarted screen_id=%s broker=%s", a.Config.ScreenID, broker)
|
|
|
|
playlistChangedFn := func() {
|
|
select {
|
|
case a.mqttFetchC <- struct{}{}:
|
|
default:
|
|
}
|
|
a.logger.Printf("event=mqtt_playlist_notification screen_id=%s", a.Config.ScreenID)
|
|
}
|
|
sub := mqttsubscriber.New(broker, a.Config.ScreenID, username, password, playlistChangedFn, a.screenshotFn)
|
|
a.mqttSub = sub
|
|
if sub != nil {
|
|
a.logger.Printf("event=mqtt_subscriber_restarted screen_id=%s broker=%s", a.Config.ScreenID, broker)
|
|
}
|
|
}
|