morz-infoboard/player/agent/internal/app/app_test.go
Jesko Anschütz 6084712800 feat(mqtt): MQTT-Config per Heartbeat-Response vom Server an Agents übertragen
Server gibt bei POST /api/v1/player/status jetzt mqtt-Block zurück (broker,
username, password) wenn MORZ_INFOBOARD_MQTT_BROKER gesetzt ist. Agents
parsen die Response und verbinden sich bei Config-Änderung automatisch neu
(applyMQTTConfig mit Reconnect-Logik, thread-safe via Mutex).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 15:03:15 +01:00

372 lines
11 KiB
Go

package app
import (
"bytes"
"context"
"log"
"strings"
"testing"
"time"
"git.az-it.net/az/morz-infoboard/player/agent/internal/config"
"git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter"
)
type recordingReporter struct {
callCount int
err error
errs []error
snapshots []statusreporter.Snapshot
}
func (r *recordingReporter) Send(_ context.Context, snapshot statusreporter.Snapshot) (statusreporter.MQTTConfig, error) {
r.callCount++
r.snapshots = append(r.snapshots, snapshot)
if len(r.errs) > 0 {
err := r.errs[0]
r.errs = r.errs[1:]
return statusreporter.MQTTConfig{}, err
}
return statusreporter.MQTTConfig{}, r.err
}
func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) {
var logBuffer bytes.Buffer
logger := log.New(&logBuffer, "", 0)
application := newApp(config.Config{
ScreenID: "info01-dev",
ServerBaseURL: "http://127.0.0.1:8080",
MQTTBroker: "tcp://127.0.0.1:1883",
HeartbeatEvery: 1,
StatusReportEvery: 1,
}, logger, time.Now, &recordingReporter{}, nil)
if got, want := application.Snapshot().Status, StatusStarting; got != want {
t.Fatalf("initial status = %q, want %q", got, want)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 1)
go func() {
errCh <- application.Run(ctx)
}()
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
snapshot := application.Snapshot()
if snapshot.Status == StatusRunning && !snapshot.LastHeartbeatAt.IsZero() {
break
}
time.Sleep(10 * time.Millisecond)
}
snapshot := application.Snapshot()
if got, want := snapshot.Status, StatusRunning; got != want {
t.Fatalf("running status = %q, want %q", got, want)
}
if snapshot.LastHeartbeatAt.IsZero() {
t.Fatal("LastHeartbeatAt = zero, want heartbeat timestamp")
}
cancel()
select {
case err := <-errCh:
if err != nil {
t.Fatalf("Run() error = %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Run() did not return after cancel")
}
if got, want := application.Snapshot().Status, StatusStopped; got != want {
t.Fatalf("final status = %q, want %q", got, want)
}
logs := logBuffer.String()
for _, needle := range []string{
"event=agent_configured",
"screen_id=info01-dev",
"event=agent_stopped",
} {
if !strings.Contains(logs, needle) {
t.Fatalf("logs missing %q: %s", needle, logs)
}
}
}
func TestAppSnapshotIncludesConfiguredTargets(t *testing.T) {
application := newApp(config.Config{
ScreenID: "screen-77",
ServerBaseURL: "https://backend.example",
MQTTBroker: "tcp://mqtt.example:1883",
HeartbeatEvery: 15,
StatusReportEvery: 60,
}, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{}, nil)
snapshot := application.Snapshot()
if got, want := snapshot.ScreenID, "screen-77"; got != want {
t.Fatalf("ScreenID = %q, want %q", got, want)
}
if got, want := snapshot.ServerBaseURL, "https://backend.example"; got != want {
t.Fatalf("ServerBaseURL = %q, want %q", got, want)
}
if got, want := snapshot.MQTTBroker, "tcp://mqtt.example:1883"; got != want {
t.Fatalf("MQTTBroker = %q, want %q", got, want)
}
if got, want := snapshot.HeartbeatEvery, 15; got != want {
t.Fatalf("HeartbeatEvery = %d, want %d", got, want)
}
if got, want := snapshot.ServerConnectivity, ConnectivityUnknown; got != want {
t.Fatalf("ServerConnectivity = %q, want %q", got, want)
}
}
func TestAppRunWithCanceledContextDoesNotLogConfiguredOrHeartbeat(t *testing.T) {
var logBuffer bytes.Buffer
application := newApp(config.Config{
ScreenID: "screen-canceled",
ServerBaseURL: "http://127.0.0.1:8080",
MQTTBroker: "tcp://127.0.0.1:1883",
HeartbeatEvery: 5,
StatusReportEvery: 60,
}, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, nil)
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := application.Run(ctx); err != nil {
t.Fatalf("Run() error = %v", err)
}
if got, want := application.Snapshot().Status, StatusStopped; got != want {
t.Fatalf("final status = %q, want %q", got, want)
}
logs := logBuffer.String()
for _, needle := range []string{"event=agent_configured", "event=heartbeat_tick"} {
if strings.Contains(logs, needle) {
t.Fatalf("logs unexpectedly contain %q: %s", needle, logs)
}
}
}
func TestAppRunReportsStatusWithoutStoppingOnReporterError(t *testing.T) {
var logBuffer bytes.Buffer
reporter := &recordingReporter{err: context.DeadlineExceeded}
application := newApp(config.Config{
ScreenID: "screen-reporter",
ServerBaseURL: "http://127.0.0.1:8080",
MQTTBroker: "tcp://127.0.0.1:1883",
HeartbeatEvery: 1,
StatusReportEvery: 1,
}, log.New(&logBuffer, "", 0), time.Now, reporter, nil)
ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 1)
go func() {
errCh <- application.Run(ctx)
}()
deadline := time.Now().Add(2500 * time.Millisecond)
for time.Now().Before(deadline) {
if reporter.callCount > 0 {
break
}
time.Sleep(10 * time.Millisecond)
}
if reporter.callCount == 0 {
cancel()
t.Fatal("reporter was not called")
}
cancel()
select {
case err := <-errCh:
if err != nil {
t.Fatalf("Run() error = %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Run() did not return after cancel")
}
logs := logBuffer.String()
if !strings.Contains(logs, "event=status_report_failed") {
t.Fatalf("logs missing status_report_failed event: %s", logs)
}
if got, want := application.Snapshot().ServerConnectivity, ConnectivityDegraded; got != want {
t.Fatalf("ServerConnectivity = %q, want %q", got, want)
}
}
func TestAppRunMarksServerConnectivityOnlineAfterSuccessfulReport(t *testing.T) {
reporter := &recordingReporter{}
application := newApp(config.Config{
ScreenID: "screen-online",
ServerBaseURL: "http://127.0.0.1:8080",
MQTTBroker: "tcp://127.0.0.1:1883",
HeartbeatEvery: 1,
StatusReportEvery: 1,
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil)
ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 1)
go func() {
errCh <- application.Run(ctx)
}()
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if application.Snapshot().ServerConnectivity == ConnectivityOnline {
break
}
time.Sleep(10 * time.Millisecond)
}
if got, want := application.Snapshot().ServerConnectivity, ConnectivityOnline; got != want {
cancel()
t.Fatalf("ServerConnectivity = %q, want %q", got, want)
}
if reporter.callCount == 0 {
cancel()
t.Fatal("reporter was not called")
}
if got, want := reporter.snapshots[0].ServerConnectivity, string(ConnectivityOnline); got != want {
cancel()
t.Fatalf("first reported connectivity = %q, want %q", got, want)
}
cancel()
<-errCh
}
func TestReportStatusMarksServerConnectivityOfflineAfterRepeatedFailures(t *testing.T) {
reporter := &recordingReporter{err: context.DeadlineExceeded}
application := newApp(config.Config{
ScreenID: "screen-offline",
ServerBaseURL: "http://127.0.0.1:8080",
MQTTBroker: "tcp://127.0.0.1:1883",
HeartbeatEvery: 30,
StatusReportEvery: 30,
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil)
application.reportStatus(context.Background())
if got, want := application.Snapshot().ServerConnectivity, ConnectivityDegraded; got != want {
t.Fatalf("after first failure ServerConnectivity = %q, want %q", got, want)
}
application.reportStatus(context.Background())
application.reportStatus(context.Background())
if got, want := application.Snapshot().ServerConnectivity, ConnectivityOffline; got != want {
t.Fatalf("after repeated failures ServerConnectivity = %q, want %q", got, want)
}
}
func TestReportStatusRecoversFromOfflineToOnline(t *testing.T) {
reporter := &recordingReporter{errs: []error{context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, nil}}
application := newApp(config.Config{
ScreenID: "screen-recover",
ServerBaseURL: "http://127.0.0.1:8080",
MQTTBroker: "tcp://127.0.0.1:1883",
HeartbeatEvery: 30,
StatusReportEvery: 30,
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil)
application.reportStatus(context.Background())
application.reportStatus(context.Background())
application.reportStatus(context.Background())
if got, want := application.Snapshot().ServerConnectivity, ConnectivityOffline; got != want {
t.Fatalf("offline state = %q, want %q", got, want)
}
application.reportStatus(context.Background())
if got, want := application.Snapshot().ServerConnectivity, ConnectivityOnline; got != want {
t.Fatalf("recovered state = %q, want %q", got, want)
}
if got, want := reporter.snapshots[len(reporter.snapshots)-1].ServerConnectivity, string(ConnectivityOnline); got != want {
t.Fatalf("recovery payload connectivity = %q, want %q", got, want)
}
}
type recordingMQTTSender struct {
calls []mqttHeartbeatCall
err error
}
type mqttHeartbeatCall struct {
status string
connectivity string
}
func (r *recordingMQTTSender) SendHeartbeat(status, connectivity string, _ time.Time) error {
r.calls = append(r.calls, mqttHeartbeatCall{status: status, connectivity: connectivity})
return r.err
}
func (r *recordingMQTTSender) Close() {}
func TestEmitHeartbeatCallsMQTTPublisher(t *testing.T) {
mqttSend := &recordingMQTTSender{}
application := newApp(config.Config{
ScreenID: "mqtt-screen",
HeartbeatEvery: 30,
StatusReportEvery: 60,
}, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{}, mqttSend)
application.emitHeartbeat()
if got, want := len(mqttSend.calls), 1; got != want {
t.Fatalf("MQTT call count = %d, want %d", got, want)
}
if got, want := mqttSend.calls[0].status, string(StatusStarting); got != want {
t.Fatalf("heartbeat status = %q, want %q", got, want)
}
}
func TestEmitHeartbeatSkipsWhenNoMQTTPublisher(t *testing.T) {
var logBuffer bytes.Buffer
application := newApp(config.Config{
ScreenID: "no-mqtt-screen",
HeartbeatEvery: 30,
StatusReportEvery: 60,
}, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, nil)
application.emitHeartbeat() // must not panic
if strings.Contains(logBuffer.String(), "event=mqtt_heartbeat") {
t.Fatalf("unexpected MQTT heartbeat log entry when no publisher configured: %s", logBuffer.String())
}
}
func TestEmitHeartbeatLogsMQTTFailure(t *testing.T) {
var logBuffer bytes.Buffer
mqttSend := &recordingMQTTSender{err: context.DeadlineExceeded}
application := newApp(config.Config{
ScreenID: "mqtt-fail-screen",
HeartbeatEvery: 30,
StatusReportEvery: 60,
}, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, mqttSend)
application.emitHeartbeat()
if !strings.Contains(logBuffer.String(), "event=mqtt_heartbeat_failed") {
t.Fatalf("logs missing mqtt_heartbeat_failed: %s", logBuffer.String())
}
}