package mqttheartbeat import ( "encoding/json" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) // payload is the JSON structure published to the heartbeat topic. type payload struct { ScreenID string `json:"screen_id"` Timestamp string `json:"ts"` Status string `json:"status"` ServerConnectivity string `json:"server_connectivity"` } // Publisher publishes MQTT heartbeats for a single screen. // It connects with auto-reconnect enabled, so transient broker outages are // handled transparently; individual Publish calls fail and should be logged // by the caller. type Publisher struct { client mqtt.Client screenID string } // New creates a Publisher and initiates a non-blocking connection to broker. // username and password may be empty if the broker requires no authentication. // paho retries the connection in the background if the broker is unreachable // at startup, so New always succeeds. Publish calls will return errors until // the connection is established. func New(broker, screenID, username, password string) *Publisher { opts := mqtt.NewClientOptions(). AddBroker(broker). SetClientID("morz-agent-" + screenID). SetCleanSession(true). SetAutoReconnect(true). SetConnectRetry(true). SetConnectRetryInterval(10 * time.Second) if username != "" { opts.SetUsername(username) opts.SetPassword(password) } client := mqtt.NewClient(opts) client.Connect() // non-blocking; paho retries in background return &Publisher{client: client, screenID: screenID} } // Topic returns the MQTT topic for the screen's heartbeat. func Topic(screenID string) string { return "signage/screen/" + screenID + "/heartbeat" } // BuildPayload builds the JSON heartbeat payload. Exported for testing. func BuildPayload(screenID, status, connectivity string, ts time.Time) ([]byte, error) { return json.Marshal(payload{ ScreenID: screenID, Timestamp: ts.UTC().Format(time.RFC3339), Status: status, ServerConnectivity: connectivity, }) } // SendHeartbeat publishes a heartbeat to the screen's topic. // QoS 0, not retained. Returns an error if the broker is unreachable or // the publish token fails; callers should log and continue. func (p *Publisher) SendHeartbeat(status, connectivity string, ts time.Time) error { data, err := BuildPayload(p.screenID, status, connectivity, ts) if err != nil { return err } topic := Topic(p.screenID) token := p.client.Publish(topic, 0, false, data) token.WaitTimeout(3 * time.Second) return token.Error() } // SendDisplayState publiziert den aktuellen Display-Zustand auf dem display-state-Topic. // QoS 0, nicht retained — Informationszweck/Monitoring. func (p *Publisher) SendDisplayState(screenSlug, state string) error { type dsPayload struct { DisplayState string `json:"display_state"` Timestamp string `json:"ts"` } data, err := json.Marshal(dsPayload{ DisplayState: state, Timestamp: time.Now().UTC().Format(time.RFC3339), }) if err != nil { return err } topic := "signage/screen/" + screenSlug + "/display-state" token := p.client.Publish(topic, 0, false, data) token.WaitTimeout(3 * time.Second) return token.Error() } // Close disconnects from the broker gracefully. func (p *Publisher) Close() { p.client.Disconnect(250) }