diff --git a/player/agent/cmd/agent/main.go b/player/agent/cmd/agent/main.go index 9e11cd4..4c3e2a4 100644 --- a/player/agent/cmd/agent/main.go +++ b/player/agent/cmd/agent/main.go @@ -1,8 +1,11 @@ package main import ( + "context" "log" "os" + "os/signal" + "syscall" "git.az-it.net/az/morz-infoboard/player/agent/internal/app" ) @@ -15,9 +18,12 @@ func main() { logger.Fatalf("init agent: %v", err) } - logger.Printf("starting agent for screen %s", application.Config.ScreenID) + logger.Printf("event=agent_starting screen_id=%s", application.Config.ScreenID) - if err := application.Run(); err != nil { + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + if err := application.Run(ctx); err != nil { logger.Fatalf("run agent: %v", err) } } diff --git a/player/agent/internal/app/app.go b/player/agent/internal/app/app.go index 8506daf..5b783d2 100644 --- a/player/agent/internal/app/app.go +++ b/player/agent/internal/app/app.go @@ -1,17 +1,43 @@ package app import ( + "context" "fmt" "log" "os" + "sync" "time" "git.az-it.net/az/morz-infoboard/player/agent/internal/config" ) +type Status string + +const ( + StatusStarting Status = "starting" + StatusRunning Status = "running" + StatusStopped Status = "stopped" +) + +type HealthSnapshot struct { + Status Status + ScreenID string + ServerBaseURL string + MQTTBroker string + HeartbeatEvery int + StartedAt time.Time + LastHeartbeatAt time.Time +} + type App struct { Config config.Config logger *log.Logger + now func() time.Time + + mu sync.RWMutex + status Status + startedAt time.Time + lastHeartbeatAt time.Time } func New() (*App, error) { @@ -22,21 +48,83 @@ func New() (*App, error) { logger := log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC) - return &App{Config: cfg, logger: logger}, nil + return newApp(cfg, logger, time.Now), nil } -func (a *App) Run() error { +func newApp(cfg config.Config, logger *log.Logger, now func() time.Time) *App { + if logger == nil { + logger = log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC) + } + + if now == nil { + now = time.Now + } + + return &App{ + Config: cfg, + logger: logger, + now: now, + status: StatusStarting, + } +} + +func (a *App) Snapshot() HealthSnapshot { + a.mu.RLock() + defer a.mu.RUnlock() + + return HealthSnapshot{ + Status: a.status, + ScreenID: a.Config.ScreenID, + ServerBaseURL: a.Config.ServerBaseURL, + MQTTBroker: a.Config.MQTTBroker, + HeartbeatEvery: a.Config.HeartbeatEvery, + StartedAt: a.startedAt, + LastHeartbeatAt: a.lastHeartbeatAt, + } +} + +func (a *App) Run(ctx context.Context) error { if a.Config.ScreenID == "" { return fmt.Errorf("screen id is required") } - a.logger.Printf("configured server=%s mqtt=%s heartbeat=%ds", a.Config.ServerBaseURL, a.Config.MQTTBroker, a.Config.HeartbeatEvery) + a.mu.Lock() + a.status = StatusRunning + a.startedAt = a.now() + a.mu.Unlock() + + a.logger.Printf( + "event=agent_configured screen_id=%s server_url=%s mqtt_broker=%s heartbeat_every_seconds=%d", + a.Config.ScreenID, + a.Config.ServerBaseURL, + a.Config.MQTTBroker, + a.Config.HeartbeatEvery, + ) + a.emitHeartbeat() ticker := time.NewTicker(time.Duration(a.Config.HeartbeatEvery) * time.Second) defer ticker.Stop() for { - a.logger.Printf("heartbeat tick screen=%s", a.Config.ScreenID) - <-ticker.C + select { + case <-ctx.Done(): + a.mu.Lock() + a.status = StatusStopped + a.mu.Unlock() + a.logger.Printf("event=agent_stopped screen_id=%s", a.Config.ScreenID) + return nil + case <-ticker.C: + a.emitHeartbeat() + } } } + +func (a *App) emitHeartbeat() { + now := a.now() + + a.mu.Lock() + a.lastHeartbeatAt = now + a.mu.Unlock() + + a.logger.Printf("event=heartbeat_tick screen_id=%s", a.Config.ScreenID) +} diff --git a/player/agent/internal/app/app_test.go b/player/agent/internal/app/app_test.go new file mode 100644 index 0000000..d7a83b3 --- /dev/null +++ b/player/agent/internal/app/app_test.go @@ -0,0 +1,109 @@ +package app + +import ( + "bytes" + "context" + "log" + "strings" + "testing" + "time" + + "git.az-it.net/az/morz-infoboard/player/agent/internal/config" +) + +func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) { + var logBuffer bytes.Buffer + logger := log.New(&logBuffer, "", 0) + + application := newApp(config.Config{ + ScreenID: "info01-dev", + ServerBaseURL: "http://127.0.0.1:8080", + MQTTBroker: "tcp://127.0.0.1:1883", + HeartbeatEvery: 1, + }, logger, time.Now) + + if got, want := application.Snapshot().Status, StatusStarting; got != want { + t.Fatalf("initial status = %q, want %q", got, want) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errCh := make(chan error, 1) + go func() { + errCh <- application.Run(ctx) + }() + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + snapshot := application.Snapshot() + if snapshot.Status == StatusRunning && !snapshot.LastHeartbeatAt.IsZero() { + break + } + + time.Sleep(10 * time.Millisecond) + } + + snapshot := application.Snapshot() + if got, want := snapshot.Status, StatusRunning; got != want { + t.Fatalf("running status = %q, want %q", got, want) + } + + if snapshot.LastHeartbeatAt.IsZero() { + t.Fatal("LastHeartbeatAt = zero, want heartbeat timestamp") + } + + cancel() + + select { + case err := <-errCh: + if err != nil { + t.Fatalf("Run() error = %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Run() did not return after cancel") + } + + if got, want := application.Snapshot().Status, StatusStopped; got != want { + t.Fatalf("final status = %q, want %q", got, want) + } + + logs := logBuffer.String() + for _, needle := range []string{ + "event=agent_configured", + "screen_id=info01-dev", + "event=heartbeat_tick", + "event=agent_stopped", + } { + if !strings.Contains(logs, needle) { + t.Fatalf("logs missing %q: %s", needle, logs) + } + } +} + +func TestAppSnapshotIncludesConfiguredTargets(t *testing.T) { + application := newApp(config.Config{ + ScreenID: "screen-77", + ServerBaseURL: "https://backend.example", + MQTTBroker: "tcp://mqtt.example:1883", + HeartbeatEvery: 15, + }, log.New(&bytes.Buffer{}, "", 0), time.Now) + + snapshot := application.Snapshot() + + if got, want := snapshot.ScreenID, "screen-77"; got != want { + t.Fatalf("ScreenID = %q, want %q", got, want) + } + + if got, want := snapshot.ServerBaseURL, "https://backend.example"; got != want { + t.Fatalf("ServerBaseURL = %q, want %q", got, want) + } + + if got, want := snapshot.MQTTBroker, "tcp://mqtt.example:1883"; got != want { + t.Fatalf("MQTTBroker = %q, want %q", got, want) + } + + if got, want := snapshot.HeartbeatEvery, 15; got != want { + t.Fatalf("HeartbeatEvery = %d, want %d", got, want) + } +}