diff --git a/player/agent/internal/app/app.go b/player/agent/internal/app/app.go index a7f439d..316f364 100644 --- a/player/agent/internal/app/app.go +++ b/player/agent/internal/app/app.go @@ -9,6 +9,7 @@ import ( "time" "git.az-it.net/az/morz-infoboard/player/agent/internal/config" + "git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter" ) type Status string @@ -30,9 +31,10 @@ type HealthSnapshot struct { } type App struct { - Config config.Config - logger *log.Logger - now func() time.Time + Config config.Config + logger *log.Logger + now func() time.Time + reporter statusSender mu sync.RWMutex status Status @@ -40,6 +42,10 @@ type App struct { lastHeartbeatAt time.Time } +type statusSender interface { + Send(ctx context.Context, snapshot statusreporter.Snapshot) error +} + func New() (*App, error) { cfg, err := config.Load() if err != nil { @@ -48,10 +54,10 @@ func New() (*App, error) { logger := log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC) - return newApp(cfg, logger, time.Now), nil + return newApp(cfg, logger, time.Now, statusreporter.New(cfg.ServerBaseURL, nil, time.Now)), nil } -func newApp(cfg config.Config, logger *log.Logger, now func() time.Time) *App { +func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporter statusSender) *App { if logger == nil { logger = log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC) } @@ -61,10 +67,11 @@ func newApp(cfg config.Config, logger *log.Logger, now func() time.Time) *App { } return &App{ - Config: cfg, - logger: logger, - now: now, - status: StatusStarting, + Config: cfg, + logger: logger, + now: now, + reporter: reporter, + status: StatusStarting, } } @@ -116,6 +123,10 @@ func (a *App) Run(ctx context.Context) error { ticker := time.NewTicker(time.Duration(a.Config.HeartbeatEvery) * time.Second) defer ticker.Stop() + reportTicker := time.NewTicker(time.Duration(a.Config.StatusReportEvery) * time.Second) + defer reportTicker.Stop() + a.reportStatus(ctx) + for { select { case <-ctx.Done(): @@ -126,6 +137,8 @@ func (a *App) Run(ctx context.Context) error { return nil case <-ticker.C: a.emitHeartbeat() + case <-reportTicker.C: + a.reportStatus(ctx) } } } @@ -139,3 +152,27 @@ func (a *App) emitHeartbeat() { a.logger.Printf("event=heartbeat_tick screen_id=%s", a.Config.ScreenID) } + +func (a *App) reportStatus(ctx context.Context) { + if a.reporter == nil { + return + } + + snapshot := a.Snapshot() + + err := a.reporter.Send(ctx, statusreporter.Snapshot{ + Status: string(snapshot.Status), + ScreenID: snapshot.ScreenID, + ServerBaseURL: snapshot.ServerBaseURL, + MQTTBroker: snapshot.MQTTBroker, + HeartbeatEverySeconds: snapshot.HeartbeatEvery, + StartedAt: snapshot.StartedAt, + LastHeartbeatAt: snapshot.LastHeartbeatAt, + }) + if err != nil { + a.logger.Printf("event=status_report_failed screen_id=%s error=%v", a.Config.ScreenID, err) + return + } + + a.logger.Printf("event=status_report_sent screen_id=%s", a.Config.ScreenID) +} diff --git a/player/agent/internal/app/app_test.go b/player/agent/internal/app/app_test.go index 874ae05..6fbf8b5 100644 --- a/player/agent/internal/app/app_test.go +++ b/player/agent/internal/app/app_test.go @@ -9,18 +9,30 @@ import ( "time" "git.az-it.net/az/morz-infoboard/player/agent/internal/config" + "git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter" ) +type recordingReporter struct { + callCount int + err error +} + +func (r *recordingReporter) Send(ctx context.Context, snapshot statusreporter.Snapshot) error { + r.callCount++ + return r.err +} + func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) { var logBuffer bytes.Buffer logger := log.New(&logBuffer, "", 0) application := newApp(config.Config{ - ScreenID: "info01-dev", - ServerBaseURL: "http://127.0.0.1:8080", - MQTTBroker: "tcp://127.0.0.1:1883", - HeartbeatEvery: 1, - }, logger, time.Now) + ScreenID: "info01-dev", + ServerBaseURL: "http://127.0.0.1:8080", + MQTTBroker: "tcp://127.0.0.1:1883", + HeartbeatEvery: 1, + StatusReportEvery: 1, + }, logger, time.Now, &recordingReporter{}) if got, want := application.Snapshot().Status, StatusStarting; got != want { t.Fatalf("initial status = %q, want %q", got, want) @@ -83,11 +95,12 @@ func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) { func TestAppSnapshotIncludesConfiguredTargets(t *testing.T) { application := newApp(config.Config{ - ScreenID: "screen-77", - ServerBaseURL: "https://backend.example", - MQTTBroker: "tcp://mqtt.example:1883", - HeartbeatEvery: 15, - }, log.New(&bytes.Buffer{}, "", 0), time.Now) + ScreenID: "screen-77", + ServerBaseURL: "https://backend.example", + MQTTBroker: "tcp://mqtt.example:1883", + HeartbeatEvery: 15, + StatusReportEvery: 60, + }, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{}) snapshot := application.Snapshot() @@ -111,11 +124,12 @@ func TestAppSnapshotIncludesConfiguredTargets(t *testing.T) { func TestAppRunWithCanceledContextDoesNotLogConfiguredOrHeartbeat(t *testing.T) { var logBuffer bytes.Buffer application := newApp(config.Config{ - ScreenID: "screen-canceled", - ServerBaseURL: "http://127.0.0.1:8080", - MQTTBroker: "tcp://127.0.0.1:1883", - HeartbeatEvery: 5, - }, log.New(&logBuffer, "", 0), time.Now) + ScreenID: "screen-canceled", + ServerBaseURL: "http://127.0.0.1:8080", + MQTTBroker: "tcp://127.0.0.1:1883", + HeartbeatEvery: 5, + StatusReportEvery: 60, + }, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}) ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -135,3 +149,50 @@ func TestAppRunWithCanceledContextDoesNotLogConfiguredOrHeartbeat(t *testing.T) } } } + +func TestAppRunReportsStatusWithoutStoppingOnReporterError(t *testing.T) { + var logBuffer bytes.Buffer + reporter := &recordingReporter{err: context.DeadlineExceeded} + application := newApp(config.Config{ + ScreenID: "screen-reporter", + ServerBaseURL: "http://127.0.0.1:8080", + MQTTBroker: "tcp://127.0.0.1:1883", + HeartbeatEvery: 1, + StatusReportEvery: 1, + }, log.New(&logBuffer, "", 0), time.Now, reporter) + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 1) + go func() { + errCh <- application.Run(ctx) + }() + + deadline := time.Now().Add(2500 * time.Millisecond) + for time.Now().Before(deadline) { + if reporter.callCount > 0 { + break + } + time.Sleep(10 * time.Millisecond) + } + + if reporter.callCount == 0 { + cancel() + t.Fatal("reporter was not called") + } + + cancel() + + select { + case err := <-errCh: + if err != nil { + t.Fatalf("Run() error = %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Run() did not return after cancel") + } + + logs := logBuffer.String() + if !strings.Contains(logs, "event=status_report_failed") { + t.Fatalf("logs missing status_report_failed event: %s", logs) + } +} diff --git a/player/agent/internal/config/config.go b/player/agent/internal/config/config.go index dd05499..b6382f4 100644 --- a/player/agent/internal/config/config.go +++ b/player/agent/internal/config/config.go @@ -7,10 +7,11 @@ import ( ) type Config struct { - ScreenID string `json:"screen_id"` - ServerBaseURL string `json:"server_base_url"` - MQTTBroker string `json:"mqtt_broker"` - HeartbeatEvery int `json:"heartbeat_every_seconds"` + ScreenID string `json:"screen_id"` + ServerBaseURL string `json:"server_base_url"` + MQTTBroker string `json:"mqtt_broker"` + HeartbeatEvery int `json:"heartbeat_every_seconds"` + StatusReportEvery int `json:"status_report_every_seconds"` } const defaultConfigPath = "/etc/signage/config.json" @@ -35,16 +36,20 @@ func Load() (Config, error) { cfg.HeartbeatEvery = defaultConfig().HeartbeatEvery } + if cfg.StatusReportEvery <= 0 { + cfg.StatusReportEvery = defaultConfig().StatusReportEvery + } + return cfg, nil } - func defaultConfig() Config { return Config{ - ScreenID: "unset-screen", - ServerBaseURL: "http://127.0.0.1:8080", - MQTTBroker: "tcp://127.0.0.1:1883", - HeartbeatEvery: 30, + ScreenID: "unset-screen", + ServerBaseURL: "http://127.0.0.1:8080", + MQTTBroker: "tcp://127.0.0.1:1883", + HeartbeatEvery: 30, + StatusReportEvery: 60, } } @@ -61,6 +66,13 @@ func overrideFromEnv(cfg *Config) { cfg.ScreenID = getenv("MORZ_INFOBOARD_SCREEN_ID", cfg.ScreenID) cfg.ServerBaseURL = getenv("MORZ_INFOBOARD_SERVER_URL", cfg.ServerBaseURL) cfg.MQTTBroker = getenv("MORZ_INFOBOARD_MQTT_BROKER", cfg.MQTTBroker) + if value := getenv("MORZ_INFOBOARD_STATUS_REPORT_EVERY", ""); value != "" { + var parsed int + _, _ = fmt.Sscanf(value, "%d", &parsed) + if parsed > 0 { + cfg.StatusReportEvery = parsed + } + } } func getenv(key, fallback string) string { diff --git a/player/agent/internal/config/config_test.go b/player/agent/internal/config/config_test.go index 5663307..751d214 100644 --- a/player/agent/internal/config/config_test.go +++ b/player/agent/internal/config/config_test.go @@ -14,7 +14,8 @@ func TestLoadReadsFileAndEnvOverrides(t *testing.T) { "screen_id": "file-screen", "server_base_url": "http://file.example", "mqtt_broker": "tcp://file-broker:1883", - "heartbeat_every_seconds": 45 + "heartbeat_every_seconds": 45, + "status_report_every_seconds": 90 }`) if err := os.WriteFile(configPath, content, 0o644); err != nil { @@ -39,4 +40,21 @@ func TestLoadReadsFileAndEnvOverrides(t *testing.T) { if got, want := cfg.HeartbeatEvery, 45; got != want { t.Fatalf("HeartbeatEvery = %d, want %d", got, want) } + + if got, want := cfg.StatusReportEvery, 90; got != want { + t.Fatalf("StatusReportEvery = %d, want %d", got, want) + } +} + +func TestLoadFallsBackToDefaultStatusReportInterval(t *testing.T) { + t.Setenv("MORZ_INFOBOARD_CONFIG", filepath.Join(t.TempDir(), "missing.json")) + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error = %v", err) + } + + if got, want := cfg.StatusReportEvery, 60; got != want { + t.Fatalf("StatusReportEvery = %d, want %d", got, want) + } }