- neues Paket mqttheartbeat: Publisher mit paho, topic signage/screen/<id>/heartbeat,
payload {screen_id, ts, status, server_connectivity}, auto-reconnect bei Ausfall
- MORZ_INFOBOARD_MQTT_BROKER leer (Standard) -> MQTT komplett uebersprungen
- app.emitHeartbeat() publiziert bei jedem Tick per MQTT wenn Broker konfiguriert,
loggt Fehler und laeuft weiter (kein Stop bei MQTT-Ausfall)
- mqtt.Close() bei context.Done()
- MQTTBroker-Default von tcp://127.0.0.1:1883 auf "" geaendert
- erste externe Dep: github.com/eclipse/paho.mqtt.golang v1.5.1
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
373 lines
11 KiB
Go
373 lines
11 KiB
Go
package app
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"log"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/config"
|
|
"git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter"
|
|
)
|
|
|
|
type recordingReporter struct {
|
|
callCount int
|
|
err error
|
|
errs []error
|
|
snapshots []statusreporter.Snapshot
|
|
}
|
|
|
|
func (r *recordingReporter) Send(_ context.Context, snapshot statusreporter.Snapshot) error {
|
|
r.callCount++
|
|
r.snapshots = append(r.snapshots, snapshot)
|
|
if len(r.errs) > 0 {
|
|
err := r.errs[0]
|
|
r.errs = r.errs[1:]
|
|
return err
|
|
}
|
|
return r.err
|
|
}
|
|
|
|
func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) {
|
|
var logBuffer bytes.Buffer
|
|
logger := log.New(&logBuffer, "", 0)
|
|
|
|
application := newApp(config.Config{
|
|
ScreenID: "info01-dev",
|
|
ServerBaseURL: "http://127.0.0.1:8080",
|
|
MQTTBroker: "tcp://127.0.0.1:1883",
|
|
HeartbeatEvery: 1,
|
|
StatusReportEvery: 1,
|
|
}, logger, time.Now, &recordingReporter{}, nil)
|
|
|
|
if got, want := application.Snapshot().Status, StatusStarting; got != want {
|
|
t.Fatalf("initial status = %q, want %q", got, want)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
errCh <- application.Run(ctx)
|
|
}()
|
|
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
snapshot := application.Snapshot()
|
|
if snapshot.Status == StatusRunning && !snapshot.LastHeartbeatAt.IsZero() {
|
|
break
|
|
}
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
snapshot := application.Snapshot()
|
|
if got, want := snapshot.Status, StatusRunning; got != want {
|
|
t.Fatalf("running status = %q, want %q", got, want)
|
|
}
|
|
|
|
if snapshot.LastHeartbeatAt.IsZero() {
|
|
t.Fatal("LastHeartbeatAt = zero, want heartbeat timestamp")
|
|
}
|
|
|
|
cancel()
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
t.Fatalf("Run() error = %v", err)
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("Run() did not return after cancel")
|
|
}
|
|
|
|
if got, want := application.Snapshot().Status, StatusStopped; got != want {
|
|
t.Fatalf("final status = %q, want %q", got, want)
|
|
}
|
|
|
|
logs := logBuffer.String()
|
|
for _, needle := range []string{
|
|
"event=agent_configured",
|
|
"screen_id=info01-dev",
|
|
"event=heartbeat_tick",
|
|
"event=agent_stopped",
|
|
} {
|
|
if !strings.Contains(logs, needle) {
|
|
t.Fatalf("logs missing %q: %s", needle, logs)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestAppSnapshotIncludesConfiguredTargets(t *testing.T) {
|
|
application := newApp(config.Config{
|
|
ScreenID: "screen-77",
|
|
ServerBaseURL: "https://backend.example",
|
|
MQTTBroker: "tcp://mqtt.example:1883",
|
|
HeartbeatEvery: 15,
|
|
StatusReportEvery: 60,
|
|
}, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{}, nil)
|
|
|
|
snapshot := application.Snapshot()
|
|
|
|
if got, want := snapshot.ScreenID, "screen-77"; got != want {
|
|
t.Fatalf("ScreenID = %q, want %q", got, want)
|
|
}
|
|
|
|
if got, want := snapshot.ServerBaseURL, "https://backend.example"; got != want {
|
|
t.Fatalf("ServerBaseURL = %q, want %q", got, want)
|
|
}
|
|
|
|
if got, want := snapshot.MQTTBroker, "tcp://mqtt.example:1883"; got != want {
|
|
t.Fatalf("MQTTBroker = %q, want %q", got, want)
|
|
}
|
|
|
|
if got, want := snapshot.HeartbeatEvery, 15; got != want {
|
|
t.Fatalf("HeartbeatEvery = %d, want %d", got, want)
|
|
}
|
|
|
|
if got, want := snapshot.ServerConnectivity, ConnectivityUnknown; got != want {
|
|
t.Fatalf("ServerConnectivity = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestAppRunWithCanceledContextDoesNotLogConfiguredOrHeartbeat(t *testing.T) {
|
|
var logBuffer bytes.Buffer
|
|
application := newApp(config.Config{
|
|
ScreenID: "screen-canceled",
|
|
ServerBaseURL: "http://127.0.0.1:8080",
|
|
MQTTBroker: "tcp://127.0.0.1:1883",
|
|
HeartbeatEvery: 5,
|
|
StatusReportEvery: 60,
|
|
}, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, nil)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
|
|
if err := application.Run(ctx); err != nil {
|
|
t.Fatalf("Run() error = %v", err)
|
|
}
|
|
|
|
if got, want := application.Snapshot().Status, StatusStopped; got != want {
|
|
t.Fatalf("final status = %q, want %q", got, want)
|
|
}
|
|
|
|
logs := logBuffer.String()
|
|
for _, needle := range []string{"event=agent_configured", "event=heartbeat_tick"} {
|
|
if strings.Contains(logs, needle) {
|
|
t.Fatalf("logs unexpectedly contain %q: %s", needle, logs)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestAppRunReportsStatusWithoutStoppingOnReporterError(t *testing.T) {
|
|
var logBuffer bytes.Buffer
|
|
reporter := &recordingReporter{err: context.DeadlineExceeded}
|
|
application := newApp(config.Config{
|
|
ScreenID: "screen-reporter",
|
|
ServerBaseURL: "http://127.0.0.1:8080",
|
|
MQTTBroker: "tcp://127.0.0.1:1883",
|
|
HeartbeatEvery: 1,
|
|
StatusReportEvery: 1,
|
|
}, log.New(&logBuffer, "", 0), time.Now, reporter, nil)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
errCh <- application.Run(ctx)
|
|
}()
|
|
|
|
deadline := time.Now().Add(2500 * time.Millisecond)
|
|
for time.Now().Before(deadline) {
|
|
if reporter.callCount > 0 {
|
|
break
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
if reporter.callCount == 0 {
|
|
cancel()
|
|
t.Fatal("reporter was not called")
|
|
}
|
|
|
|
cancel()
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
t.Fatalf("Run() error = %v", err)
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("Run() did not return after cancel")
|
|
}
|
|
|
|
logs := logBuffer.String()
|
|
if !strings.Contains(logs, "event=status_report_failed") {
|
|
t.Fatalf("logs missing status_report_failed event: %s", logs)
|
|
}
|
|
|
|
if got, want := application.Snapshot().ServerConnectivity, ConnectivityDegraded; got != want {
|
|
t.Fatalf("ServerConnectivity = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestAppRunMarksServerConnectivityOnlineAfterSuccessfulReport(t *testing.T) {
|
|
reporter := &recordingReporter{}
|
|
application := newApp(config.Config{
|
|
ScreenID: "screen-online",
|
|
ServerBaseURL: "http://127.0.0.1:8080",
|
|
MQTTBroker: "tcp://127.0.0.1:1883",
|
|
HeartbeatEvery: 1,
|
|
StatusReportEvery: 1,
|
|
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
errCh <- application.Run(ctx)
|
|
}()
|
|
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
if application.Snapshot().ServerConnectivity == ConnectivityOnline {
|
|
break
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
if got, want := application.Snapshot().ServerConnectivity, ConnectivityOnline; got != want {
|
|
cancel()
|
|
t.Fatalf("ServerConnectivity = %q, want %q", got, want)
|
|
}
|
|
|
|
if reporter.callCount == 0 {
|
|
cancel()
|
|
t.Fatal("reporter was not called")
|
|
}
|
|
|
|
if got, want := reporter.snapshots[0].ServerConnectivity, string(ConnectivityOnline); got != want {
|
|
cancel()
|
|
t.Fatalf("first reported connectivity = %q, want %q", got, want)
|
|
}
|
|
|
|
cancel()
|
|
<-errCh
|
|
}
|
|
|
|
func TestReportStatusMarksServerConnectivityOfflineAfterRepeatedFailures(t *testing.T) {
|
|
reporter := &recordingReporter{err: context.DeadlineExceeded}
|
|
application := newApp(config.Config{
|
|
ScreenID: "screen-offline",
|
|
ServerBaseURL: "http://127.0.0.1:8080",
|
|
MQTTBroker: "tcp://127.0.0.1:1883",
|
|
HeartbeatEvery: 30,
|
|
StatusReportEvery: 30,
|
|
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil)
|
|
|
|
application.reportStatus(context.Background())
|
|
if got, want := application.Snapshot().ServerConnectivity, ConnectivityDegraded; got != want {
|
|
t.Fatalf("after first failure ServerConnectivity = %q, want %q", got, want)
|
|
}
|
|
|
|
application.reportStatus(context.Background())
|
|
application.reportStatus(context.Background())
|
|
|
|
if got, want := application.Snapshot().ServerConnectivity, ConnectivityOffline; got != want {
|
|
t.Fatalf("after repeated failures ServerConnectivity = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestReportStatusRecoversFromOfflineToOnline(t *testing.T) {
|
|
reporter := &recordingReporter{errs: []error{context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, nil}}
|
|
application := newApp(config.Config{
|
|
ScreenID: "screen-recover",
|
|
ServerBaseURL: "http://127.0.0.1:8080",
|
|
MQTTBroker: "tcp://127.0.0.1:1883",
|
|
HeartbeatEvery: 30,
|
|
StatusReportEvery: 30,
|
|
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil)
|
|
|
|
application.reportStatus(context.Background())
|
|
application.reportStatus(context.Background())
|
|
application.reportStatus(context.Background())
|
|
if got, want := application.Snapshot().ServerConnectivity, ConnectivityOffline; got != want {
|
|
t.Fatalf("offline state = %q, want %q", got, want)
|
|
}
|
|
|
|
application.reportStatus(context.Background())
|
|
if got, want := application.Snapshot().ServerConnectivity, ConnectivityOnline; got != want {
|
|
t.Fatalf("recovered state = %q, want %q", got, want)
|
|
}
|
|
|
|
if got, want := reporter.snapshots[len(reporter.snapshots)-1].ServerConnectivity, string(ConnectivityOnline); got != want {
|
|
t.Fatalf("recovery payload connectivity = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
type recordingMQTTSender struct {
|
|
calls []mqttHeartbeatCall
|
|
err error
|
|
}
|
|
|
|
type mqttHeartbeatCall struct {
|
|
status string
|
|
connectivity string
|
|
}
|
|
|
|
func (r *recordingMQTTSender) SendHeartbeat(status, connectivity string, _ time.Time) error {
|
|
r.calls = append(r.calls, mqttHeartbeatCall{status: status, connectivity: connectivity})
|
|
return r.err
|
|
}
|
|
|
|
func (r *recordingMQTTSender) Close() {}
|
|
|
|
func TestEmitHeartbeatCallsMQTTPublisher(t *testing.T) {
|
|
mqttSend := &recordingMQTTSender{}
|
|
application := newApp(config.Config{
|
|
ScreenID: "mqtt-screen",
|
|
HeartbeatEvery: 30,
|
|
StatusReportEvery: 60,
|
|
}, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{}, mqttSend)
|
|
|
|
application.emitHeartbeat()
|
|
|
|
if got, want := len(mqttSend.calls), 1; got != want {
|
|
t.Fatalf("MQTT call count = %d, want %d", got, want)
|
|
}
|
|
if got, want := mqttSend.calls[0].status, string(StatusStarting); got != want {
|
|
t.Fatalf("heartbeat status = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestEmitHeartbeatSkipsWhenNoMQTTPublisher(t *testing.T) {
|
|
var logBuffer bytes.Buffer
|
|
application := newApp(config.Config{
|
|
ScreenID: "no-mqtt-screen",
|
|
HeartbeatEvery: 30,
|
|
StatusReportEvery: 60,
|
|
}, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, nil)
|
|
|
|
application.emitHeartbeat() // must not panic
|
|
|
|
if strings.Contains(logBuffer.String(), "event=mqtt_heartbeat") {
|
|
t.Fatalf("unexpected MQTT heartbeat log entry when no publisher configured: %s", logBuffer.String())
|
|
}
|
|
}
|
|
|
|
func TestEmitHeartbeatLogsMQTTFailure(t *testing.T) {
|
|
var logBuffer bytes.Buffer
|
|
mqttSend := &recordingMQTTSender{err: context.DeadlineExceeded}
|
|
application := newApp(config.Config{
|
|
ScreenID: "mqtt-fail-screen",
|
|
HeartbeatEvery: 30,
|
|
StatusReportEvery: 60,
|
|
}, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, mqttSend)
|
|
|
|
application.emitHeartbeat()
|
|
|
|
if !strings.Contains(logBuffer.String(), "event=mqtt_heartbeat_failed") {
|
|
t.Fatalf("logs missing mqtt_heartbeat_failed: %s", logBuffer.String())
|
|
}
|
|
}
|