Fuehre Health-Modell und strukturierte Agent-Logs ein
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
parent
a976312e21
commit
3b01594638
3 changed files with 210 additions and 7 deletions
|
|
@ -1,8 +1,11 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"git.az-it.net/az/morz-infoboard/player/agent/internal/app"
|
||||
)
|
||||
|
|
@ -15,9 +18,12 @@ func main() {
|
|||
logger.Fatalf("init agent: %v", err)
|
||||
}
|
||||
|
||||
logger.Printf("starting agent for screen %s", application.Config.ScreenID)
|
||||
logger.Printf("event=agent_starting screen_id=%s", application.Config.ScreenID)
|
||||
|
||||
if err := application.Run(); err != nil {
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
if err := application.Run(ctx); err != nil {
|
||||
logger.Fatalf("run agent: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,17 +1,43 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.az-it.net/az/morz-infoboard/player/agent/internal/config"
|
||||
)
|
||||
|
||||
type Status string
|
||||
|
||||
const (
|
||||
StatusStarting Status = "starting"
|
||||
StatusRunning Status = "running"
|
||||
StatusStopped Status = "stopped"
|
||||
)
|
||||
|
||||
type HealthSnapshot struct {
|
||||
Status Status
|
||||
ScreenID string
|
||||
ServerBaseURL string
|
||||
MQTTBroker string
|
||||
HeartbeatEvery int
|
||||
StartedAt time.Time
|
||||
LastHeartbeatAt time.Time
|
||||
}
|
||||
|
||||
type App struct {
|
||||
Config config.Config
|
||||
logger *log.Logger
|
||||
now func() time.Time
|
||||
|
||||
mu sync.RWMutex
|
||||
status Status
|
||||
startedAt time.Time
|
||||
lastHeartbeatAt time.Time
|
||||
}
|
||||
|
||||
func New() (*App, error) {
|
||||
|
|
@ -22,21 +48,83 @@ func New() (*App, error) {
|
|||
|
||||
logger := log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
|
||||
|
||||
return &App{Config: cfg, logger: logger}, nil
|
||||
return newApp(cfg, logger, time.Now), nil
|
||||
}
|
||||
|
||||
func (a *App) Run() error {
|
||||
func newApp(cfg config.Config, logger *log.Logger, now func() time.Time) *App {
|
||||
if logger == nil {
|
||||
logger = log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
|
||||
}
|
||||
|
||||
if now == nil {
|
||||
now = time.Now
|
||||
}
|
||||
|
||||
return &App{
|
||||
Config: cfg,
|
||||
logger: logger,
|
||||
now: now,
|
||||
status: StatusStarting,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) Snapshot() HealthSnapshot {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
|
||||
return HealthSnapshot{
|
||||
Status: a.status,
|
||||
ScreenID: a.Config.ScreenID,
|
||||
ServerBaseURL: a.Config.ServerBaseURL,
|
||||
MQTTBroker: a.Config.MQTTBroker,
|
||||
HeartbeatEvery: a.Config.HeartbeatEvery,
|
||||
StartedAt: a.startedAt,
|
||||
LastHeartbeatAt: a.lastHeartbeatAt,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) Run(ctx context.Context) error {
|
||||
if a.Config.ScreenID == "" {
|
||||
return fmt.Errorf("screen id is required")
|
||||
}
|
||||
|
||||
a.logger.Printf("configured server=%s mqtt=%s heartbeat=%ds", a.Config.ServerBaseURL, a.Config.MQTTBroker, a.Config.HeartbeatEvery)
|
||||
a.mu.Lock()
|
||||
a.status = StatusRunning
|
||||
a.startedAt = a.now()
|
||||
a.mu.Unlock()
|
||||
|
||||
a.logger.Printf(
|
||||
"event=agent_configured screen_id=%s server_url=%s mqtt_broker=%s heartbeat_every_seconds=%d",
|
||||
a.Config.ScreenID,
|
||||
a.Config.ServerBaseURL,
|
||||
a.Config.MQTTBroker,
|
||||
a.Config.HeartbeatEvery,
|
||||
)
|
||||
a.emitHeartbeat()
|
||||
|
||||
ticker := time.NewTicker(time.Duration(a.Config.HeartbeatEvery) * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
a.logger.Printf("heartbeat tick screen=%s", a.Config.ScreenID)
|
||||
<-ticker.C
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
a.mu.Lock()
|
||||
a.status = StatusStopped
|
||||
a.mu.Unlock()
|
||||
a.logger.Printf("event=agent_stopped screen_id=%s", a.Config.ScreenID)
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
a.emitHeartbeat()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) emitHeartbeat() {
|
||||
now := a.now()
|
||||
|
||||
a.mu.Lock()
|
||||
a.lastHeartbeatAt = now
|
||||
a.mu.Unlock()
|
||||
|
||||
a.logger.Printf("event=heartbeat_tick screen_id=%s", a.Config.ScreenID)
|
||||
}
|
||||
|
|
|
|||
109
player/agent/internal/app/app_test.go
Normal file
109
player/agent/internal/app/app_test.go
Normal file
|
|
@ -0,0 +1,109 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"log"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.az-it.net/az/morz-infoboard/player/agent/internal/config"
|
||||
)
|
||||
|
||||
func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) {
|
||||
var logBuffer bytes.Buffer
|
||||
logger := log.New(&logBuffer, "", 0)
|
||||
|
||||
application := newApp(config.Config{
|
||||
ScreenID: "info01-dev",
|
||||
ServerBaseURL: "http://127.0.0.1:8080",
|
||||
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||
HeartbeatEvery: 1,
|
||||
}, logger, time.Now)
|
||||
|
||||
if got, want := application.Snapshot().Status, StatusStarting; got != want {
|
||||
t.Fatalf("initial status = %q, want %q", got, want)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- application.Run(ctx)
|
||||
}()
|
||||
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
snapshot := application.Snapshot()
|
||||
if snapshot.Status == StatusRunning && !snapshot.LastHeartbeatAt.IsZero() {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
snapshot := application.Snapshot()
|
||||
if got, want := snapshot.Status, StatusRunning; got != want {
|
||||
t.Fatalf("running status = %q, want %q", got, want)
|
||||
}
|
||||
|
||||
if snapshot.LastHeartbeatAt.IsZero() {
|
||||
t.Fatal("LastHeartbeatAt = zero, want heartbeat timestamp")
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
t.Fatalf("Run() error = %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Run() did not return after cancel")
|
||||
}
|
||||
|
||||
if got, want := application.Snapshot().Status, StatusStopped; got != want {
|
||||
t.Fatalf("final status = %q, want %q", got, want)
|
||||
}
|
||||
|
||||
logs := logBuffer.String()
|
||||
for _, needle := range []string{
|
||||
"event=agent_configured",
|
||||
"screen_id=info01-dev",
|
||||
"event=heartbeat_tick",
|
||||
"event=agent_stopped",
|
||||
} {
|
||||
if !strings.Contains(logs, needle) {
|
||||
t.Fatalf("logs missing %q: %s", needle, logs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppSnapshotIncludesConfiguredTargets(t *testing.T) {
|
||||
application := newApp(config.Config{
|
||||
ScreenID: "screen-77",
|
||||
ServerBaseURL: "https://backend.example",
|
||||
MQTTBroker: "tcp://mqtt.example:1883",
|
||||
HeartbeatEvery: 15,
|
||||
}, log.New(&bytes.Buffer{}, "", 0), time.Now)
|
||||
|
||||
snapshot := application.Snapshot()
|
||||
|
||||
if got, want := snapshot.ScreenID, "screen-77"; got != want {
|
||||
t.Fatalf("ScreenID = %q, want %q", got, want)
|
||||
}
|
||||
|
||||
if got, want := snapshot.ServerBaseURL, "https://backend.example"; got != want {
|
||||
t.Fatalf("ServerBaseURL = %q, want %q", got, want)
|
||||
}
|
||||
|
||||
if got, want := snapshot.MQTTBroker, "tcp://mqtt.example:1883"; got != want {
|
||||
t.Fatalf("MQTTBroker = %q, want %q", got, want)
|
||||
}
|
||||
|
||||
if got, want := snapshot.HeartbeatEvery, 15; got != want {
|
||||
t.Fatalf("HeartbeatEvery = %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue