Melde Agent-Status periodisch an das Backend
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
parent
0c7f0b5b13
commit
6623a313bb
4 changed files with 162 additions and 34 deletions
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.az-it.net/az/morz-infoboard/player/agent/internal/config"
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/config"
|
||||||
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Status string
|
type Status string
|
||||||
|
|
@ -33,6 +34,7 @@ type App struct {
|
||||||
Config config.Config
|
Config config.Config
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
now func() time.Time
|
now func() time.Time
|
||||||
|
reporter statusSender
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
status Status
|
status Status
|
||||||
|
|
@ -40,6 +42,10 @@ type App struct {
|
||||||
lastHeartbeatAt time.Time
|
lastHeartbeatAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type statusSender interface {
|
||||||
|
Send(ctx context.Context, snapshot statusreporter.Snapshot) error
|
||||||
|
}
|
||||||
|
|
||||||
func New() (*App, error) {
|
func New() (*App, error) {
|
||||||
cfg, err := config.Load()
|
cfg, err := config.Load()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -48,10 +54,10 @@ func New() (*App, error) {
|
||||||
|
|
||||||
logger := log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
|
logger := log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
|
||||||
|
|
||||||
return newApp(cfg, logger, time.Now), nil
|
return newApp(cfg, logger, time.Now, statusreporter.New(cfg.ServerBaseURL, nil, time.Now)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newApp(cfg config.Config, logger *log.Logger, now func() time.Time) *App {
|
func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporter statusSender) *App {
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
|
logger = log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
|
||||||
}
|
}
|
||||||
|
|
@ -64,6 +70,7 @@ func newApp(cfg config.Config, logger *log.Logger, now func() time.Time) *App {
|
||||||
Config: cfg,
|
Config: cfg,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
now: now,
|
now: now,
|
||||||
|
reporter: reporter,
|
||||||
status: StatusStarting,
|
status: StatusStarting,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -116,6 +123,10 @@ func (a *App) Run(ctx context.Context) error {
|
||||||
ticker := time.NewTicker(time.Duration(a.Config.HeartbeatEvery) * time.Second)
|
ticker := time.NewTicker(time.Duration(a.Config.HeartbeatEvery) * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
reportTicker := time.NewTicker(time.Duration(a.Config.StatusReportEvery) * time.Second)
|
||||||
|
defer reportTicker.Stop()
|
||||||
|
a.reportStatus(ctx)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
@ -126,6 +137,8 @@ func (a *App) Run(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
a.emitHeartbeat()
|
a.emitHeartbeat()
|
||||||
|
case <-reportTicker.C:
|
||||||
|
a.reportStatus(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -139,3 +152,27 @@ func (a *App) emitHeartbeat() {
|
||||||
|
|
||||||
a.logger.Printf("event=heartbeat_tick screen_id=%s", a.Config.ScreenID)
|
a.logger.Printf("event=heartbeat_tick screen_id=%s", a.Config.ScreenID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *App) reportStatus(ctx context.Context) {
|
||||||
|
if a.reporter == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshot := a.Snapshot()
|
||||||
|
|
||||||
|
err := a.reporter.Send(ctx, statusreporter.Snapshot{
|
||||||
|
Status: string(snapshot.Status),
|
||||||
|
ScreenID: snapshot.ScreenID,
|
||||||
|
ServerBaseURL: snapshot.ServerBaseURL,
|
||||||
|
MQTTBroker: snapshot.MQTTBroker,
|
||||||
|
HeartbeatEverySeconds: snapshot.HeartbeatEvery,
|
||||||
|
StartedAt: snapshot.StartedAt,
|
||||||
|
LastHeartbeatAt: snapshot.LastHeartbeatAt,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
a.logger.Printf("event=status_report_failed screen_id=%s error=%v", a.Config.ScreenID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
a.logger.Printf("event=status_report_sent screen_id=%s", a.Config.ScreenID)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,8 +9,19 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.az-it.net/az/morz-infoboard/player/agent/internal/config"
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/config"
|
||||||
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type recordingReporter struct {
|
||||||
|
callCount int
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *recordingReporter) Send(ctx context.Context, snapshot statusreporter.Snapshot) error {
|
||||||
|
r.callCount++
|
||||||
|
return r.err
|
||||||
|
}
|
||||||
|
|
||||||
func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) {
|
func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) {
|
||||||
var logBuffer bytes.Buffer
|
var logBuffer bytes.Buffer
|
||||||
logger := log.New(&logBuffer, "", 0)
|
logger := log.New(&logBuffer, "", 0)
|
||||||
|
|
@ -20,7 +31,8 @@ func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) {
|
||||||
ServerBaseURL: "http://127.0.0.1:8080",
|
ServerBaseURL: "http://127.0.0.1:8080",
|
||||||
MQTTBroker: "tcp://127.0.0.1:1883",
|
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||||
HeartbeatEvery: 1,
|
HeartbeatEvery: 1,
|
||||||
}, logger, time.Now)
|
StatusReportEvery: 1,
|
||||||
|
}, logger, time.Now, &recordingReporter{})
|
||||||
|
|
||||||
if got, want := application.Snapshot().Status, StatusStarting; got != want {
|
if got, want := application.Snapshot().Status, StatusStarting; got != want {
|
||||||
t.Fatalf("initial status = %q, want %q", got, want)
|
t.Fatalf("initial status = %q, want %q", got, want)
|
||||||
|
|
@ -87,7 +99,8 @@ func TestAppSnapshotIncludesConfiguredTargets(t *testing.T) {
|
||||||
ServerBaseURL: "https://backend.example",
|
ServerBaseURL: "https://backend.example",
|
||||||
MQTTBroker: "tcp://mqtt.example:1883",
|
MQTTBroker: "tcp://mqtt.example:1883",
|
||||||
HeartbeatEvery: 15,
|
HeartbeatEvery: 15,
|
||||||
}, log.New(&bytes.Buffer{}, "", 0), time.Now)
|
StatusReportEvery: 60,
|
||||||
|
}, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{})
|
||||||
|
|
||||||
snapshot := application.Snapshot()
|
snapshot := application.Snapshot()
|
||||||
|
|
||||||
|
|
@ -115,7 +128,8 @@ func TestAppRunWithCanceledContextDoesNotLogConfiguredOrHeartbeat(t *testing.T)
|
||||||
ServerBaseURL: "http://127.0.0.1:8080",
|
ServerBaseURL: "http://127.0.0.1:8080",
|
||||||
MQTTBroker: "tcp://127.0.0.1:1883",
|
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||||
HeartbeatEvery: 5,
|
HeartbeatEvery: 5,
|
||||||
}, log.New(&logBuffer, "", 0), time.Now)
|
StatusReportEvery: 60,
|
||||||
|
}, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{})
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
cancel()
|
cancel()
|
||||||
|
|
@ -135,3 +149,50 @@ func TestAppRunWithCanceledContextDoesNotLogConfiguredOrHeartbeat(t *testing.T)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAppRunReportsStatusWithoutStoppingOnReporterError(t *testing.T) {
|
||||||
|
var logBuffer bytes.Buffer
|
||||||
|
reporter := &recordingReporter{err: context.DeadlineExceeded}
|
||||||
|
application := newApp(config.Config{
|
||||||
|
ScreenID: "screen-reporter",
|
||||||
|
ServerBaseURL: "http://127.0.0.1:8080",
|
||||||
|
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||||
|
HeartbeatEvery: 1,
|
||||||
|
StatusReportEvery: 1,
|
||||||
|
}, log.New(&logBuffer, "", 0), time.Now, reporter)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
errCh <- application.Run(ctx)
|
||||||
|
}()
|
||||||
|
|
||||||
|
deadline := time.Now().Add(2500 * time.Millisecond)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if reporter.callCount > 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
if reporter.callCount == 0 {
|
||||||
|
cancel()
|
||||||
|
t.Fatal("reporter was not called")
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Run() error = %v", err)
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("Run() did not return after cancel")
|
||||||
|
}
|
||||||
|
|
||||||
|
logs := logBuffer.String()
|
||||||
|
if !strings.Contains(logs, "event=status_report_failed") {
|
||||||
|
t.Fatalf("logs missing status_report_failed event: %s", logs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ type Config struct {
|
||||||
ServerBaseURL string `json:"server_base_url"`
|
ServerBaseURL string `json:"server_base_url"`
|
||||||
MQTTBroker string `json:"mqtt_broker"`
|
MQTTBroker string `json:"mqtt_broker"`
|
||||||
HeartbeatEvery int `json:"heartbeat_every_seconds"`
|
HeartbeatEvery int `json:"heartbeat_every_seconds"`
|
||||||
|
StatusReportEvery int `json:"status_report_every_seconds"`
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultConfigPath = "/etc/signage/config.json"
|
const defaultConfigPath = "/etc/signage/config.json"
|
||||||
|
|
@ -35,16 +36,20 @@ func Load() (Config, error) {
|
||||||
cfg.HeartbeatEvery = defaultConfig().HeartbeatEvery
|
cfg.HeartbeatEvery = defaultConfig().HeartbeatEvery
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.StatusReportEvery <= 0 {
|
||||||
|
cfg.StatusReportEvery = defaultConfig().StatusReportEvery
|
||||||
|
}
|
||||||
|
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func defaultConfig() Config {
|
func defaultConfig() Config {
|
||||||
return Config{
|
return Config{
|
||||||
ScreenID: "unset-screen",
|
ScreenID: "unset-screen",
|
||||||
ServerBaseURL: "http://127.0.0.1:8080",
|
ServerBaseURL: "http://127.0.0.1:8080",
|
||||||
MQTTBroker: "tcp://127.0.0.1:1883",
|
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||||
HeartbeatEvery: 30,
|
HeartbeatEvery: 30,
|
||||||
|
StatusReportEvery: 60,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -61,6 +66,13 @@ func overrideFromEnv(cfg *Config) {
|
||||||
cfg.ScreenID = getenv("MORZ_INFOBOARD_SCREEN_ID", cfg.ScreenID)
|
cfg.ScreenID = getenv("MORZ_INFOBOARD_SCREEN_ID", cfg.ScreenID)
|
||||||
cfg.ServerBaseURL = getenv("MORZ_INFOBOARD_SERVER_URL", cfg.ServerBaseURL)
|
cfg.ServerBaseURL = getenv("MORZ_INFOBOARD_SERVER_URL", cfg.ServerBaseURL)
|
||||||
cfg.MQTTBroker = getenv("MORZ_INFOBOARD_MQTT_BROKER", cfg.MQTTBroker)
|
cfg.MQTTBroker = getenv("MORZ_INFOBOARD_MQTT_BROKER", cfg.MQTTBroker)
|
||||||
|
if value := getenv("MORZ_INFOBOARD_STATUS_REPORT_EVERY", ""); value != "" {
|
||||||
|
var parsed int
|
||||||
|
_, _ = fmt.Sscanf(value, "%d", &parsed)
|
||||||
|
if parsed > 0 {
|
||||||
|
cfg.StatusReportEvery = parsed
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getenv(key, fallback string) string {
|
func getenv(key, fallback string) string {
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,8 @@ func TestLoadReadsFileAndEnvOverrides(t *testing.T) {
|
||||||
"screen_id": "file-screen",
|
"screen_id": "file-screen",
|
||||||
"server_base_url": "http://file.example",
|
"server_base_url": "http://file.example",
|
||||||
"mqtt_broker": "tcp://file-broker:1883",
|
"mqtt_broker": "tcp://file-broker:1883",
|
||||||
"heartbeat_every_seconds": 45
|
"heartbeat_every_seconds": 45,
|
||||||
|
"status_report_every_seconds": 90
|
||||||
}`)
|
}`)
|
||||||
|
|
||||||
if err := os.WriteFile(configPath, content, 0o644); err != nil {
|
if err := os.WriteFile(configPath, content, 0o644); err != nil {
|
||||||
|
|
@ -39,4 +40,21 @@ func TestLoadReadsFileAndEnvOverrides(t *testing.T) {
|
||||||
if got, want := cfg.HeartbeatEvery, 45; got != want {
|
if got, want := cfg.HeartbeatEvery, 45; got != want {
|
||||||
t.Fatalf("HeartbeatEvery = %d, want %d", got, want)
|
t.Fatalf("HeartbeatEvery = %d, want %d", got, want)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if got, want := cfg.StatusReportEvery, 90; got != want {
|
||||||
|
t.Fatalf("StatusReportEvery = %d, want %d", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadFallsBackToDefaultStatusReportInterval(t *testing.T) {
|
||||||
|
t.Setenv("MORZ_INFOBOARD_CONFIG", filepath.Join(t.TempDir(), "missing.json"))
|
||||||
|
|
||||||
|
cfg, err := Load()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Load() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got, want := cfg.StatusReportEvery, 60; got != want {
|
||||||
|
t.Fatalf("StatusReportEvery = %d, want %d", got, want)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue