Compare commits
4 commits
a99f8a5784
...
d461abc3f5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d461abc3f5 | ||
|
|
b111cf8421 | ||
|
|
1dbebc0a2b | ||
|
|
d0137179e5 |
17 changed files with 426 additions and 19 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -16,3 +16,6 @@ dist/
|
|||
|
||||
# Compose override files
|
||||
compose.override.yml
|
||||
vault.yml
|
||||
ansible/roles/signage_player/files/morz-agent
|
||||
player/agent/agent-linux-arm64
|
||||
|
|
|
|||
9
ansible/group_vars/signage_players/vars.yml
Normal file
9
ansible/group_vars/signage_players/vars.yml
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
---
|
||||
morz_server_base_url: "http://10.0.0.70:8080"
|
||||
morz_mqtt_broker: "tcp://dockerbox.morz.de:1883"
|
||||
morz_heartbeat_every_seconds: 30
|
||||
morz_status_report_every_seconds: 60
|
||||
|
||||
# Credentials kommen aus vault.yml (ansible-vault)
|
||||
morz_mqtt_username: "{{ vault_mqtt_username }}"
|
||||
morz_mqtt_password: "{{ vault_mqtt_password }}"
|
||||
4
ansible/host_vars/info10/vars.yml
Normal file
4
ansible/host_vars/info10/vars.yml
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
---
|
||||
ansible_host: 10.0.0.200
|
||||
ansible_user: morz
|
||||
screen_id: info01-dev
|
||||
6
ansible/inventory.yml
Normal file
6
ansible/inventory.yml
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
all:
|
||||
children:
|
||||
signage_players:
|
||||
hosts:
|
||||
info10:
|
||||
11
ansible/roles/signage_player/defaults/main.yml
Normal file
11
ansible/roles/signage_player/defaults/main.yml
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
---
|
||||
signage_user: morz
|
||||
signage_config_dir: /etc/signage
|
||||
signage_binary_dest: /usr/local/bin/morz-agent
|
||||
|
||||
morz_server_base_url: "http://10.0.0.70:8080"
|
||||
morz_mqtt_broker: ""
|
||||
morz_mqtt_username: ""
|
||||
morz_mqtt_password: ""
|
||||
morz_heartbeat_every_seconds: 30
|
||||
morz_status_report_every_seconds: 60
|
||||
17
ansible/roles/signage_player/handlers/main.yml
Normal file
17
ansible/roles/signage_player/handlers/main.yml
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
---
|
||||
- name: Reload systemd
|
||||
ansible.builtin.systemd:
|
||||
daemon_reload: true
|
||||
become: true
|
||||
|
||||
- name: Restart journald
|
||||
ansible.builtin.systemd:
|
||||
name: systemd-journald
|
||||
state: restarted
|
||||
become: true
|
||||
|
||||
- name: Restart morz-agent
|
||||
ansible.builtin.systemd:
|
||||
name: morz-agent
|
||||
state: restarted
|
||||
become: true
|
||||
81
ansible/roles/signage_player/tasks/main.yml
Normal file
81
ansible/roles/signage_player/tasks/main.yml
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
---
|
||||
- name: Build agent binary for linux/arm64
|
||||
ansible.builtin.command:
|
||||
cmd: go build -o {{ role_path }}/files/morz-agent ./cmd/agent
|
||||
chdir: "{{ playbook_dir }}/../player/agent"
|
||||
environment:
|
||||
GOOS: linux
|
||||
GOARCH: arm64
|
||||
delegate_to: localhost
|
||||
changed_when: true
|
||||
|
||||
- name: Ensure config directory exists
|
||||
ansible.builtin.file:
|
||||
path: "{{ signage_config_dir }}"
|
||||
state: directory
|
||||
owner: root
|
||||
group: root
|
||||
mode: "0755"
|
||||
become: true
|
||||
|
||||
- name: Deploy agent config
|
||||
ansible.builtin.template:
|
||||
src: config.json.j2
|
||||
dest: "{{ signage_config_dir }}/config.json"
|
||||
owner: root
|
||||
group: "{{ signage_user }}"
|
||||
mode: "0640"
|
||||
become: true
|
||||
notify: Restart morz-agent
|
||||
|
||||
- name: Deploy agent binary
|
||||
ansible.builtin.copy:
|
||||
src: morz-agent
|
||||
dest: "{{ signage_binary_dest }}"
|
||||
owner: root
|
||||
group: root
|
||||
mode: "0755"
|
||||
become: true
|
||||
notify: Restart morz-agent
|
||||
|
||||
- name: Deploy systemd unit
|
||||
ansible.builtin.template:
|
||||
src: morz-agent.service.j2
|
||||
dest: /etc/systemd/system/morz-agent.service
|
||||
owner: root
|
||||
group: root
|
||||
mode: "0644"
|
||||
become: true
|
||||
notify:
|
||||
- Reload systemd
|
||||
- Restart morz-agent
|
||||
|
||||
- name: Ensure journald drop-in directory exists
|
||||
ansible.builtin.file:
|
||||
path: /etc/systemd/journald.conf.d
|
||||
state: directory
|
||||
owner: root
|
||||
group: root
|
||||
mode: "0755"
|
||||
become: true
|
||||
|
||||
- name: Configure journald volatile storage (RAM only, schont SD-Karte)
|
||||
ansible.builtin.copy:
|
||||
dest: /etc/systemd/journald.conf.d/morz-volatile.conf
|
||||
content: |
|
||||
[Journal]
|
||||
Storage=volatile
|
||||
RuntimeMaxUse=20M
|
||||
owner: root
|
||||
group: root
|
||||
mode: "0644"
|
||||
become: true
|
||||
notify: Restart journald
|
||||
|
||||
- name: Enable and start morz-agent
|
||||
ansible.builtin.systemd:
|
||||
name: morz-agent
|
||||
enabled: true
|
||||
state: started
|
||||
daemon_reload: false
|
||||
become: true
|
||||
9
ansible/roles/signage_player/templates/config.json.j2
Normal file
9
ansible/roles/signage_player/templates/config.json.j2
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
{
|
||||
"screen_id": "{{ screen_id }}",
|
||||
"server_base_url": "{{ morz_server_base_url }}",
|
||||
"mqtt_broker": "{{ morz_mqtt_broker }}",
|
||||
"mqtt_username": "{{ morz_mqtt_username }}",
|
||||
"mqtt_password": "{{ morz_mqtt_password }}",
|
||||
"heartbeat_every_seconds": {{ morz_heartbeat_every_seconds }},
|
||||
"status_report_every_seconds": {{ morz_status_report_every_seconds }}
|
||||
}
|
||||
16
ansible/roles/signage_player/templates/morz-agent.service.j2
Normal file
16
ansible/roles/signage_player/templates/morz-agent.service.j2
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
[Unit]
|
||||
Description=Morz Infoboard Player Agent
|
||||
After=network-online.target
|
||||
Wants=network-online.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User={{ signage_user }}
|
||||
ExecStart={{ signage_binary_dest }}
|
||||
Restart=on-failure
|
||||
RestartSec=10
|
||||
StandardOutput=journal
|
||||
StandardError=journal
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
|
@ -1,7 +1,6 @@
|
|||
- name: Placeholder deployment entrypoint
|
||||
hosts: all
|
||||
---
|
||||
- name: Deploy Morz Infoboard Player Agent
|
||||
hosts: signage_players
|
||||
gather_facts: false
|
||||
tasks:
|
||||
- name: Show target host placeholder
|
||||
ansible.builtin.debug:
|
||||
msg: "Placeholder fuer spaeteres signage deployment auf {{ inventory_hostname }}"
|
||||
roles:
|
||||
- signage_player
|
||||
|
|
|
|||
|
|
@ -1,3 +1,10 @@
|
|||
module git.az-it.net/az/morz-infoboard/player/agent
|
||||
|
||||
go 1.24.0
|
||||
|
||||
require (
|
||||
github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect
|
||||
github.com/gorilla/websocket v1.5.3 // indirect
|
||||
golang.org/x/net v0.44.0 // indirect
|
||||
golang.org/x/sync v0.17.0 // indirect
|
||||
)
|
||||
|
|
|
|||
8
player/agent/go.sum
Normal file
8
player/agent/go.sum
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE=
|
||||
github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
|
||||
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
|
||||
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
|
||||
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
|
|
@ -9,6 +9,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.az-it.net/az/morz-infoboard/player/agent/internal/config"
|
||||
"git.az-it.net/az/morz-infoboard/player/agent/internal/mqttheartbeat"
|
||||
"git.az-it.net/az/morz-infoboard/player/agent/internal/statusreporter"
|
||||
)
|
||||
|
||||
|
|
@ -45,6 +46,7 @@ type App struct {
|
|||
logger *log.Logger
|
||||
now func() time.Time
|
||||
reporter statusSender
|
||||
mqttPub mqttSender
|
||||
|
||||
mu sync.RWMutex
|
||||
status Status
|
||||
|
|
@ -58,6 +60,11 @@ type statusSender interface {
|
|||
Send(ctx context.Context, snapshot statusreporter.Snapshot) error
|
||||
}
|
||||
|
||||
type mqttSender interface {
|
||||
SendHeartbeat(status, connectivity string, ts time.Time) error
|
||||
Close()
|
||||
}
|
||||
|
||||
func New() (*App, error) {
|
||||
cfg, err := config.Load()
|
||||
if err != nil {
|
||||
|
|
@ -66,10 +73,18 @@ func New() (*App, error) {
|
|||
|
||||
logger := log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
|
||||
|
||||
return newApp(cfg, logger, time.Now, statusreporter.New(cfg.ServerBaseURL, nil, time.Now)), nil
|
||||
var mqttPub mqttSender
|
||||
if cfg.MQTTBroker != "" {
|
||||
mqttPub = mqttheartbeat.New(cfg.MQTTBroker, cfg.ScreenID, cfg.MQTTUsername, cfg.MQTTPassword)
|
||||
logger.Printf("event=mqtt_enabled broker=%s", cfg.MQTTBroker)
|
||||
} else {
|
||||
logger.Printf("event=mqtt_disabled reason=no_broker_configured")
|
||||
}
|
||||
|
||||
return newApp(cfg, logger, time.Now, statusreporter.New(cfg.ServerBaseURL, nil, time.Now), mqttPub), nil
|
||||
}
|
||||
|
||||
func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporter statusSender) *App {
|
||||
func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporter statusSender, mqttPub mqttSender) *App {
|
||||
if logger == nil {
|
||||
logger = log.New(os.Stdout, "agent ", log.LstdFlags|log.LUTC)
|
||||
}
|
||||
|
|
@ -83,6 +98,7 @@ func newApp(cfg config.Config, logger *log.Logger, now func() time.Time, reporte
|
|||
logger: logger,
|
||||
now: now,
|
||||
reporter: reporter,
|
||||
mqttPub: mqttPub,
|
||||
status: StatusStarting,
|
||||
serverConnectivity: ConnectivityUnknown,
|
||||
}
|
||||
|
|
@ -147,6 +163,9 @@ func (a *App) Run(ctx context.Context) error {
|
|||
a.mu.Lock()
|
||||
a.status = StatusStopped
|
||||
a.mu.Unlock()
|
||||
if a.mqttPub != nil {
|
||||
a.mqttPub.Close()
|
||||
}
|
||||
a.logger.Printf("event=agent_stopped screen_id=%s", a.Config.ScreenID)
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
|
|
@ -162,9 +181,15 @@ func (a *App) emitHeartbeat() {
|
|||
|
||||
a.mu.Lock()
|
||||
a.lastHeartbeatAt = now
|
||||
status := a.status
|
||||
connectivity := a.serverConnectivity
|
||||
a.mu.Unlock()
|
||||
|
||||
a.logger.Printf("event=heartbeat_tick screen_id=%s", a.Config.ScreenID)
|
||||
if a.mqttPub != nil {
|
||||
if err := a.mqttPub.SendHeartbeat(string(status), string(connectivity), now); err != nil {
|
||||
a.logger.Printf("event=mqtt_heartbeat_failed screen_id=%s error=%v", a.Config.ScreenID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) reportStatus(ctx context.Context) {
|
||||
|
|
@ -204,5 +229,4 @@ func (a *App) reportStatus(ctx context.Context) {
|
|||
a.consecutiveReportFailures = 0
|
||||
a.serverConnectivity = ConnectivityOnline
|
||||
a.mu.Unlock()
|
||||
a.logger.Printf("event=status_report_sent screen_id=%s", a.Config.ScreenID)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) {
|
|||
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||
HeartbeatEvery: 1,
|
||||
StatusReportEvery: 1,
|
||||
}, logger, time.Now, &recordingReporter{})
|
||||
}, logger, time.Now, &recordingReporter{}, nil)
|
||||
|
||||
if got, want := application.Snapshot().Status, StatusStarting; got != want {
|
||||
t.Fatalf("initial status = %q, want %q", got, want)
|
||||
|
|
@ -92,7 +92,6 @@ func TestAppRunUpdatesHealthAndLogsStructuredEvents(t *testing.T) {
|
|||
for _, needle := range []string{
|
||||
"event=agent_configured",
|
||||
"screen_id=info01-dev",
|
||||
"event=heartbeat_tick",
|
||||
"event=agent_stopped",
|
||||
} {
|
||||
if !strings.Contains(logs, needle) {
|
||||
|
|
@ -108,7 +107,7 @@ func TestAppSnapshotIncludesConfiguredTargets(t *testing.T) {
|
|||
MQTTBroker: "tcp://mqtt.example:1883",
|
||||
HeartbeatEvery: 15,
|
||||
StatusReportEvery: 60,
|
||||
}, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{})
|
||||
}, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{}, nil)
|
||||
|
||||
snapshot := application.Snapshot()
|
||||
|
||||
|
|
@ -141,7 +140,7 @@ func TestAppRunWithCanceledContextDoesNotLogConfiguredOrHeartbeat(t *testing.T)
|
|||
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||
HeartbeatEvery: 5,
|
||||
StatusReportEvery: 60,
|
||||
}, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{})
|
||||
}, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, nil)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
|
@ -171,7 +170,7 @@ func TestAppRunReportsStatusWithoutStoppingOnReporterError(t *testing.T) {
|
|||
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||
HeartbeatEvery: 1,
|
||||
StatusReportEvery: 1,
|
||||
}, log.New(&logBuffer, "", 0), time.Now, reporter)
|
||||
}, log.New(&logBuffer, "", 0), time.Now, reporter, nil)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errCh := make(chan error, 1)
|
||||
|
|
@ -221,7 +220,7 @@ func TestAppRunMarksServerConnectivityOnlineAfterSuccessfulReport(t *testing.T)
|
|||
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||
HeartbeatEvery: 1,
|
||||
StatusReportEvery: 1,
|
||||
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter)
|
||||
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errCh := make(chan error, 1)
|
||||
|
|
@ -264,7 +263,7 @@ func TestReportStatusMarksServerConnectivityOfflineAfterRepeatedFailures(t *test
|
|||
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||
HeartbeatEvery: 30,
|
||||
StatusReportEvery: 30,
|
||||
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter)
|
||||
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil)
|
||||
|
||||
application.reportStatus(context.Background())
|
||||
if got, want := application.Snapshot().ServerConnectivity, ConnectivityDegraded; got != want {
|
||||
|
|
@ -287,7 +286,7 @@ func TestReportStatusRecoversFromOfflineToOnline(t *testing.T) {
|
|||
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||
HeartbeatEvery: 30,
|
||||
StatusReportEvery: 30,
|
||||
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter)
|
||||
}, log.New(&bytes.Buffer{}, "", 0), time.Now, reporter, nil)
|
||||
|
||||
application.reportStatus(context.Background())
|
||||
application.reportStatus(context.Background())
|
||||
|
|
@ -305,3 +304,69 @@ func TestReportStatusRecoversFromOfflineToOnline(t *testing.T) {
|
|||
t.Fatalf("recovery payload connectivity = %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
type recordingMQTTSender struct {
|
||||
calls []mqttHeartbeatCall
|
||||
err error
|
||||
}
|
||||
|
||||
type mqttHeartbeatCall struct {
|
||||
status string
|
||||
connectivity string
|
||||
}
|
||||
|
||||
func (r *recordingMQTTSender) SendHeartbeat(status, connectivity string, _ time.Time) error {
|
||||
r.calls = append(r.calls, mqttHeartbeatCall{status: status, connectivity: connectivity})
|
||||
return r.err
|
||||
}
|
||||
|
||||
func (r *recordingMQTTSender) Close() {}
|
||||
|
||||
func TestEmitHeartbeatCallsMQTTPublisher(t *testing.T) {
|
||||
mqttSend := &recordingMQTTSender{}
|
||||
application := newApp(config.Config{
|
||||
ScreenID: "mqtt-screen",
|
||||
HeartbeatEvery: 30,
|
||||
StatusReportEvery: 60,
|
||||
}, log.New(&bytes.Buffer{}, "", 0), time.Now, &recordingReporter{}, mqttSend)
|
||||
|
||||
application.emitHeartbeat()
|
||||
|
||||
if got, want := len(mqttSend.calls), 1; got != want {
|
||||
t.Fatalf("MQTT call count = %d, want %d", got, want)
|
||||
}
|
||||
if got, want := mqttSend.calls[0].status, string(StatusStarting); got != want {
|
||||
t.Fatalf("heartbeat status = %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmitHeartbeatSkipsWhenNoMQTTPublisher(t *testing.T) {
|
||||
var logBuffer bytes.Buffer
|
||||
application := newApp(config.Config{
|
||||
ScreenID: "no-mqtt-screen",
|
||||
HeartbeatEvery: 30,
|
||||
StatusReportEvery: 60,
|
||||
}, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, nil)
|
||||
|
||||
application.emitHeartbeat() // must not panic
|
||||
|
||||
if strings.Contains(logBuffer.String(), "event=mqtt_heartbeat") {
|
||||
t.Fatalf("unexpected MQTT heartbeat log entry when no publisher configured: %s", logBuffer.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmitHeartbeatLogsMQTTFailure(t *testing.T) {
|
||||
var logBuffer bytes.Buffer
|
||||
mqttSend := &recordingMQTTSender{err: context.DeadlineExceeded}
|
||||
application := newApp(config.Config{
|
||||
ScreenID: "mqtt-fail-screen",
|
||||
HeartbeatEvery: 30,
|
||||
StatusReportEvery: 60,
|
||||
}, log.New(&logBuffer, "", 0), time.Now, &recordingReporter{}, mqttSend)
|
||||
|
||||
application.emitHeartbeat()
|
||||
|
||||
if !strings.Contains(logBuffer.String(), "event=mqtt_heartbeat_failed") {
|
||||
t.Fatalf("logs missing mqtt_heartbeat_failed: %s", logBuffer.String())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,8 @@ type Config struct {
|
|||
ScreenID string `json:"screen_id"`
|
||||
ServerBaseURL string `json:"server_base_url"`
|
||||
MQTTBroker string `json:"mqtt_broker"`
|
||||
MQTTUsername string `json:"mqtt_username"`
|
||||
MQTTPassword string `json:"mqtt_password"`
|
||||
HeartbeatEvery int `json:"heartbeat_every_seconds"`
|
||||
StatusReportEvery int `json:"status_report_every_seconds"`
|
||||
}
|
||||
|
|
@ -47,7 +49,7 @@ func defaultConfig() Config {
|
|||
return Config{
|
||||
ScreenID: "unset-screen",
|
||||
ServerBaseURL: "http://127.0.0.1:8080",
|
||||
MQTTBroker: "tcp://127.0.0.1:1883",
|
||||
MQTTBroker: "",
|
||||
HeartbeatEvery: 30,
|
||||
StatusReportEvery: 60,
|
||||
}
|
||||
|
|
@ -66,6 +68,8 @@ func overrideFromEnv(cfg *Config) {
|
|||
cfg.ScreenID = getenv("MORZ_INFOBOARD_SCREEN_ID", cfg.ScreenID)
|
||||
cfg.ServerBaseURL = getenv("MORZ_INFOBOARD_SERVER_URL", cfg.ServerBaseURL)
|
||||
cfg.MQTTBroker = getenv("MORZ_INFOBOARD_MQTT_BROKER", cfg.MQTTBroker)
|
||||
cfg.MQTTUsername = getenv("MORZ_INFOBOARD_MQTT_USERNAME", cfg.MQTTUsername)
|
||||
cfg.MQTTPassword = getenv("MORZ_INFOBOARD_MQTT_PASSWORD", cfg.MQTTPassword)
|
||||
if value := getenv("MORZ_INFOBOARD_STATUS_REPORT_EVERY", ""); value != "" {
|
||||
var parsed int
|
||||
_, _ = fmt.Sscanf(value, "%d", &parsed)
|
||||
|
|
|
|||
84
player/agent/internal/mqttheartbeat/heartbeat.go
Normal file
84
player/agent/internal/mqttheartbeat/heartbeat.go
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
package mqttheartbeat
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
)
|
||||
|
||||
// payload is the JSON structure published to the heartbeat topic.
|
||||
type payload struct {
|
||||
ScreenID string `json:"screen_id"`
|
||||
Timestamp string `json:"ts"`
|
||||
Status string `json:"status"`
|
||||
ServerConnectivity string `json:"server_connectivity"`
|
||||
}
|
||||
|
||||
// Publisher publishes MQTT heartbeats for a single screen.
|
||||
// It connects with auto-reconnect enabled, so transient broker outages are
|
||||
// handled transparently; individual Publish calls fail and should be logged
|
||||
// by the caller.
|
||||
type Publisher struct {
|
||||
client mqtt.Client
|
||||
screenID string
|
||||
}
|
||||
|
||||
// New creates a Publisher and initiates a non-blocking connection to broker.
|
||||
// username and password may be empty if the broker requires no authentication.
|
||||
// paho retries the connection in the background if the broker is unreachable
|
||||
// at startup, so New always succeeds. Publish calls will return errors until
|
||||
// the connection is established.
|
||||
func New(broker, screenID, username, password string) *Publisher {
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker(broker).
|
||||
SetClientID("morz-agent-" + screenID).
|
||||
SetCleanSession(true).
|
||||
SetAutoReconnect(true).
|
||||
SetConnectRetry(true).
|
||||
SetConnectRetryInterval(10 * time.Second)
|
||||
|
||||
if username != "" {
|
||||
opts.SetUsername(username)
|
||||
opts.SetPassword(password)
|
||||
}
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
client.Connect() // non-blocking; paho retries in background
|
||||
return &Publisher{client: client, screenID: screenID}
|
||||
}
|
||||
|
||||
// Topic returns the MQTT topic for the screen's heartbeat.
|
||||
func Topic(screenID string) string {
|
||||
return "signage/screen/" + screenID + "/heartbeat"
|
||||
}
|
||||
|
||||
// BuildPayload builds the JSON heartbeat payload. Exported for testing.
|
||||
func BuildPayload(screenID, status, connectivity string, ts time.Time) ([]byte, error) {
|
||||
return json.Marshal(payload{
|
||||
ScreenID: screenID,
|
||||
Timestamp: ts.UTC().Format(time.RFC3339),
|
||||
Status: status,
|
||||
ServerConnectivity: connectivity,
|
||||
})
|
||||
}
|
||||
|
||||
// SendHeartbeat publishes a heartbeat to the screen's topic.
|
||||
// QoS 0, not retained. Returns an error if the broker is unreachable or
|
||||
// the publish token fails; callers should log and continue.
|
||||
func (p *Publisher) SendHeartbeat(status, connectivity string, ts time.Time) error {
|
||||
data, err := BuildPayload(p.screenID, status, connectivity, ts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
topic := Topic(p.screenID)
|
||||
token := p.client.Publish(topic, 0, false, data)
|
||||
token.WaitTimeout(3 * time.Second)
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
// Close disconnects from the broker gracefully.
|
||||
func (p *Publisher) Close() {
|
||||
p.client.Disconnect(250)
|
||||
}
|
||||
60
player/agent/internal/mqttheartbeat/heartbeat_test.go
Normal file
60
player/agent/internal/mqttheartbeat/heartbeat_test.go
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
package mqttheartbeat
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestBuildPayloadContainsAllFields(t *testing.T) {
|
||||
ts := time.Date(2026, 3, 22, 16, 9, 30, 0, time.UTC)
|
||||
|
||||
data, err := BuildPayload("info01-dev", "running", "online", ts)
|
||||
if err != nil {
|
||||
t.Fatalf("BuildPayload() error = %v", err)
|
||||
}
|
||||
|
||||
var p struct {
|
||||
ScreenID string `json:"screen_id"`
|
||||
Timestamp string `json:"ts"`
|
||||
Status string `json:"status"`
|
||||
ServerConnectivity string `json:"server_connectivity"`
|
||||
}
|
||||
if err := json.Unmarshal(data, &p); err != nil {
|
||||
t.Fatalf("Unmarshal() error = %v", err)
|
||||
}
|
||||
|
||||
if got, want := p.ScreenID, "info01-dev"; got != want {
|
||||
t.Errorf("screen_id = %q, want %q", got, want)
|
||||
}
|
||||
if got, want := p.Timestamp, "2026-03-22T16:09:30Z"; got != want {
|
||||
t.Errorf("ts = %q, want %q", got, want)
|
||||
}
|
||||
if got, want := p.Status, "running"; got != want {
|
||||
t.Errorf("status = %q, want %q", got, want)
|
||||
}
|
||||
if got, want := p.ServerConnectivity, "online"; got != want {
|
||||
t.Errorf("server_connectivity = %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildPayloadTimestampIsUTC(t *testing.T) {
|
||||
loc := time.FixedZone("CET", 1*60*60)
|
||||
ts := time.Date(2026, 3, 22, 17, 9, 30, 0, loc) // same moment, different zone
|
||||
|
||||
data, err := BuildPayload("screen-x", "running", "online", ts)
|
||||
if err != nil {
|
||||
t.Fatalf("BuildPayload() error = %v", err)
|
||||
}
|
||||
|
||||
if !strings.Contains(string(data), "2026-03-22T16:09:30Z") {
|
||||
t.Errorf("payload %s: expected UTC timestamp 2026-03-22T16:09:30Z", data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTopicFormat(t *testing.T) {
|
||||
if got, want := Topic("info01-dev"), "signage/screen/info01-dev/heartbeat"; got != want {
|
||||
t.Fatalf("Topic() = %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue