From d0137179e5598f8712c94c3251920401a18b178e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jesko=20Ansch=C3=BCtz?= Date: Sun, 22 Mar 2026 20:54:12 +0100 Subject: [PATCH] Fuege MQTT-Heartbeat zum Agent hinzu (kein Broker konfiguriert = skip) - neues Paket mqttheartbeat: Publisher mit paho, topic signage/screen//heartbeat, payload {screen_id, ts, status, server_connectivity}, auto-reconnect bei Ausfall - MORZ_INFOBOARD_MQTT_BROKER leer (Standard) -> MQTT komplett uebersprungen - app.emitHeartbeat() publiziert bei jedem Tick per MQTT wenn Broker konfiguriert, loggt Fehler und laeuft weiter (kein Stop bei MQTT-Ausfall) - mqtt.Close() bei context.Done() - MQTTBroker-Default von tcp://127.0.0.1:1883 auf "" geaendert - erste externe Dep: github.com/eclipse/paho.mqtt.golang v1.5.1 Co-Authored-By: Claude Sonnet 4.6 --- player/agent/go.mod | 7 ++ player/agent/go.sum | 8 ++ player/agent/internal/app/app.go | 33 +++++++- player/agent/internal/app/app_test.go | 80 +++++++++++++++++-- player/agent/internal/config/config.go | 2 +- .../agent/internal/mqttheartbeat/heartbeat.go | 78 ++++++++++++++++++ .../internal/mqttheartbeat/heartbeat_test.go | 60 ++++++++++++++ 7 files changed, 258 insertions(+), 10 deletions(-) create mode 100644 player/agent/go.sum create mode 100644 player/agent/internal/mqttheartbeat/heartbeat.go create mode 100644 player/agent/internal/mqttheartbeat/heartbeat_test.go diff --git a/player/agent/go.mod b/player/agent/go.mod index b958f71..d84e6c1 100644 --- a/player/agent/go.mod +++ b/player/agent/go.mod @@ -1,3 +1,10 @@ module git.az-it.net/az/morz-infoboard/player/agent go 1.24.0 + +require ( + github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sync v0.17.0 // indirect +) diff --git a/player/agent/go.sum b/player/agent/go.sum new file mode 100644 index 0000000..9abe94c --- /dev/null +++ b/player/agent/go.sum @@ -0,0 +1,8 @@ +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= diff --git a/player/agent/internal/app/app.go b/player/agent/internal/app/app.go index 38659d4..828dcb1 100644 --- a/player/agent/internal/app/app.go +++ b/player/agent/internal/app/app.go @@ -9,6 +9,7 @@ import ( "time" "git.az-it.net/az/morz-infoboard/player/agent/internal/config" + "git.az-it.net/az/morz-infoboard/player/agent/internal/mqttheartbeat" "git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter" ) @@ -45,6 +46,7 @@ type App struct { logger *log.Logger now func() time.Time reporter statusSender + mqttPub mqttSender mu sync.RWMutex status Status @@ -58,6 +60,11 @@ type statusSender interface { Send(ctx context.Context, snapshot statusreporter.Snapshot) error } +type mqttSender interface { + SendHeartbeat(status, connectivity string, ts time.Time) error + Close() +} + func New() (*App, error) { cfg, err := config.Load() if err != nil { @@ -66,10 +73,18 @@ func New() (*App, error) { logger := log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC) - return newApp(cfg, logger, time.Now, statusreporter.New(cfg.ServerBaseURL, nil, time.Now)), nil + var mqttPub mqttSender + if cfg.MQTTBroker != "" { + mqttPub = mqttheartbeat.New(cfg.MQTTBroker, cfg.ScreenID) + logger.Printf("event=mqtt_enabled broker=%s", cfg.MQTTBroker) + } else { + logger.Printf("event=mqtt_disabled reason=no_broker_configured") + } + + return newApp(cfg, logger, time.Now, statusreporter.New(cfg.ServerBaseURL, nil, time.Now), mqttPub), nil } -func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporter statusSender) *App { +func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporter statusSender, mqttPub mqttSender) *App { if logger == nil { logger = log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC) } @@ -83,6 +98,7 @@ func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporte logger: logger, now: now, reporter: reporter, + mqttPub: mqttPub, status: StatusStarting, serverConnectivity: ConnectivityUnknown, } @@ -147,6 +163,9 @@ func (a *App) Run(ctx context.Context) error { a.mu.Lock() a.status = StatusStopped a.mu.Unlock() + if a.mqttPub != nil { + a.mqttPub.Close() + } a.logger.Printf("event=agent_stopped screen_id=%s", a.Config.ScreenID) return nil case <-ticker.C: @@ -162,9 +181,19 @@ func (a *App) emitHeartbeat() { a.mu.Lock() a.lastHeartbeatAt = now + status := a.status + connectivity := a.serverConnectivity a.mu.Unlock() a.logger.Printf("event=heartbeat_tick screen_id=%s", a.Config.ScreenID) + + if a.mqttPub != nil { + if err := a.mqttPub.SendHeartbeat(string(status), string(connectivity), now); err != nil { + a.logger.Printf("event=mqtt_heartbeat_failed screen_id=%s error=%v", a.Config.ScreenID, err) + } else { + a.logger.Printf("event=mqtt_heartbeat_sent screen_id=%s", a.Config.ScreenID) + } + } } func (a *App) reportStatus(ctx context.Context) { diff --git a/player/agent/internal/app/app_test.go b/player/agent/internal/app/app_test.go index 1dd3ae0..cec16c2 100644 --- a/player/agent/internal/app/app_test.go +++ b/player/agent/internal/app/app_test.go @@ -40,7 +40,7 @@ func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) { MQTTBroker: "tcp://127.0.0.1:1883", HeartbeatEvery: 1, StatusReportEvery: 1, - }, logger, time.Now, &recordingReporter{}) + }, logger, time.Now, &recordingReporter{}, nil) if got, want := application.Snapshot().Status, StatusStarting; got != want { t.Fatalf("initial status = %q, want %q", got, want) @@ -108,7 +108,7 @@ func TestAppSnapshotIncludesConfiguredTargets(t *testing.T) { MQTTBroker: "tcp://mqtt.example:1883", HeartbeatEvery: 15, StatusReportEvery: 60, - }, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{}) + }, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{}, nil) snapshot := application.Snapshot() @@ -141,7 +141,7 @@ func TestAppRunWithCanceledContextDoesNotLogConfiguredOrHeartbeat(t *testing.T) MQTTBroker: "tcp://127.0.0.1:1883", HeartbeatEvery: 5, StatusReportEvery: 60, - }, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}) + }, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, nil) ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -171,7 +171,7 @@ func TestAppRunReportsStatusWithoutStoppingOnReporterError(t *testing.T) { MQTTBroker: "tcp://127.0.0.1:1883", HeartbeatEvery: 1, StatusReportEvery: 1, - }, log.New(&logBuffer, "", 0), time.Now, reporter) + }, log.New(&logBuffer, "", 0), time.Now, reporter, nil) ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error, 1) @@ -221,7 +221,7 @@ func TestAppRunMarksServerConnectivityOnlineAfterSuccessfulReport(t *testing.T) MQTTBroker: "tcp://127.0.0.1:1883", HeartbeatEvery: 1, StatusReportEvery: 1, - }, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter) + }, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil) ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error, 1) @@ -264,7 +264,7 @@ func TestReportStatusMarksServerConnectivityOfflineAfterRepeatedFailures(t *test MQTTBroker: "tcp://127.0.0.1:1883", HeartbeatEvery: 30, StatusReportEvery: 30, - }, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter) + }, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil) application.reportStatus(context.Background()) if got, want := application.Snapshot().ServerConnectivity, ConnectivityDegraded; got != want { @@ -287,7 +287,7 @@ func TestReportStatusRecoversFromOfflineToOnline(t *testing.T) { MQTTBroker: "tcp://127.0.0.1:1883", HeartbeatEvery: 30, StatusReportEvery: 30, - }, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter) + }, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil) application.reportStatus(context.Background()) application.reportStatus(context.Background()) @@ -305,3 +305,69 @@ func TestReportStatusRecoversFromOfflineToOnline(t *testing.T) { t.Fatalf("recovery payload connectivity = %q, want %q", got, want) } } + +type recordingMQTTSender struct { + calls []mqttHeartbeatCall + err error +} + +type mqttHeartbeatCall struct { + status string + connectivity string +} + +func (r *recordingMQTTSender) SendHeartbeat(status, connectivity string, _ time.Time) error { + r.calls = append(r.calls, mqttHeartbeatCall{status: status, connectivity: connectivity}) + return r.err +} + +func (r *recordingMQTTSender) Close() {} + +func TestEmitHeartbeatCallsMQTTPublisher(t *testing.T) { + mqttSend := &recordingMQTTSender{} + application := newApp(config.Config{ + ScreenID: "mqtt-screen", + HeartbeatEvery: 30, + StatusReportEvery: 60, + }, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{}, mqttSend) + + application.emitHeartbeat() + + if got, want := len(mqttSend.calls), 1; got != want { + t.Fatalf("MQTT call count = %d, want %d", got, want) + } + if got, want := mqttSend.calls[0].status, string(StatusStarting); got != want { + t.Fatalf("heartbeat status = %q, want %q", got, want) + } +} + +func TestEmitHeartbeatSkipsWhenNoMQTTPublisher(t *testing.T) { + var logBuffer bytes.Buffer + application := newApp(config.Config{ + ScreenID: "no-mqtt-screen", + HeartbeatEvery: 30, + StatusReportEvery: 60, + }, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, nil) + + application.emitHeartbeat() // must not panic + + if strings.Contains(logBuffer.String(), "event=mqtt_heartbeat") { + t.Fatalf("unexpected MQTT heartbeat log entry when no publisher configured: %s", logBuffer.String()) + } +} + +func TestEmitHeartbeatLogsMQTTFailure(t *testing.T) { + var logBuffer bytes.Buffer + mqttSend := &recordingMQTTSender{err: context.DeadlineExceeded} + application := newApp(config.Config{ + ScreenID: "mqtt-fail-screen", + HeartbeatEvery: 30, + StatusReportEvery: 60, + }, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, mqttSend) + + application.emitHeartbeat() + + if !strings.Contains(logBuffer.String(), "event=mqtt_heartbeat_failed") { + t.Fatalf("logs missing mqtt_heartbeat_failed: %s", logBuffer.String()) + } +} diff --git a/player/agent/internal/config/config.go b/player/agent/internal/config/config.go index b6382f4..bb45d3e 100644 --- a/player/agent/internal/config/config.go +++ b/player/agent/internal/config/config.go @@ -47,7 +47,7 @@ func defaultConfig() Config { return Config{ ScreenID: "unset-screen", ServerBaseURL: "http://127.0.0.1:8080", - MQTTBroker: "tcp://127.0.0.1:1883", + MQTTBroker: "", HeartbeatEvery: 30, StatusReportEvery: 60, } diff --git a/player/agent/internal/mqttheartbeat/heartbeat.go b/player/agent/internal/mqttheartbeat/heartbeat.go new file mode 100644 index 0000000..65a16f8 --- /dev/null +++ b/player/agent/internal/mqttheartbeat/heartbeat.go @@ -0,0 +1,78 @@ +package mqttheartbeat + +import ( + "encoding/json" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +// payload is the JSON structure published to the heartbeat topic. +type payload struct { + ScreenID string `json:"screen_id"` + Timestamp string `json:"ts"` + Status string `json:"status"` + ServerConnectivity string `json:"server_connectivity"` +} + +// Publisher publishes MQTT heartbeats for a single screen. +// It connects with auto-reconnect enabled, so transient broker outages are +// handled transparently; individual Publish calls fail and should be logged +// by the caller. +type Publisher struct { + client mqtt.Client + screenID string +} + +// New creates a Publisher and initiates a non-blocking connection to broker. +// paho retries the connection in the background if the broker is unreachable +// at startup, so New always succeeds. Publish calls will return errors until +// the connection is established. +func New(broker, screenID string) *Publisher { + opts := mqtt.NewClientOptions(). + AddBroker(broker). + SetClientID("morz-agent-" + screenID). + SetCleanSession(true). + SetAutoReconnect(true). + SetConnectRetry(true). + SetConnectRetryInterval(10 * time.Second) + + client := mqtt.NewClient(opts) + client.Connect() // non-blocking; paho retries in background + return &Publisher{client: client, screenID: screenID} +} + +// Topic returns the MQTT topic for the screen's heartbeat. +func Topic(screenID string) string { + return "signage/screen/" + screenID + "/heartbeat" +} + +// BuildPayload builds the JSON heartbeat payload. Exported for testing. +func BuildPayload(screenID, status, connectivity string, ts time.Time) ([]byte, error) { + return json.Marshal(payload{ + ScreenID: screenID, + Timestamp: ts.UTC().Format(time.RFC3339), + Status: status, + ServerConnectivity: connectivity, + }) +} + +// SendHeartbeat publishes a heartbeat to the screen's topic. +// QoS 0, not retained. Returns an error if the broker is unreachable or +// the publish token fails; callers should log and continue. +func (p *Publisher) SendHeartbeat(status, connectivity string, ts time.Time) error { + data, err := BuildPayload(p.screenID, status, connectivity, ts) + if err != nil { + return err + } + + topic := Topic(p.screenID) + token := p.client.Publish(topic, 0, false, data) + token.WaitTimeout(3 * time.Second) + return token.Error() +} + +// Close disconnects from the broker gracefully. +func (p *Publisher) Close() { + p.client.Disconnect(250) +} diff --git a/player/agent/internal/mqttheartbeat/heartbeat_test.go b/player/agent/internal/mqttheartbeat/heartbeat_test.go new file mode 100644 index 0000000..b53bd2f --- /dev/null +++ b/player/agent/internal/mqttheartbeat/heartbeat_test.go @@ -0,0 +1,60 @@ +package mqttheartbeat + +import ( + "encoding/json" + "strings" + "testing" + "time" +) + +func TestBuildPayloadContainsAllFields(t *testing.T) { + ts := time.Date(2026, 3, 22, 16, 9, 30, 0, time.UTC) + + data, err := BuildPayload("info01-dev", "running", "online", ts) + if err != nil { + t.Fatalf("BuildPayload() error = %v", err) + } + + var p struct { + ScreenID string `json:"screen_id"` + Timestamp string `json:"ts"` + Status string `json:"status"` + ServerConnectivity string `json:"server_connectivity"` + } + if err := json.Unmarshal(data, &p); err != nil { + t.Fatalf("Unmarshal() error = %v", err) + } + + if got, want := p.ScreenID, "info01-dev"; got != want { + t.Errorf("screen_id = %q, want %q", got, want) + } + if got, want := p.Timestamp, "2026-03-22T16:09:30Z"; got != want { + t.Errorf("ts = %q, want %q", got, want) + } + if got, want := p.Status, "running"; got != want { + t.Errorf("status = %q, want %q", got, want) + } + if got, want := p.ServerConnectivity, "online"; got != want { + t.Errorf("server_connectivity = %q, want %q", got, want) + } +} + +func TestBuildPayloadTimestampIsUTC(t *testing.T) { + loc := time.FixedZone("CET", 1*60*60) + ts := time.Date(2026, 3, 22, 17, 9, 30, 0, loc) // same moment, different zone + + data, err := BuildPayload("screen-x", "running", "online", ts) + if err != nil { + t.Fatalf("BuildPayload() error = %v", err) + } + + if !strings.Contains(string(data), "2026-03-22T16:09:30Z") { + t.Errorf("payload %s: expected UTC timestamp 2026-03-22T16:09:30Z", data) + } +} + +func TestTopicFormat(t *testing.T) { + if got, want := Topic("info01-dev"), "signage/screen/info01-dev/heartbeat"; got != want { + t.Fatalf("Topic() = %q, want %q", got, want) + } +}