package main import ( "crypto/tls" "sentinel/pkg/config" "sentinel/pkg/log" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) type MQTTService struct { client mqtt.Client handler func(topic string, payload []byte) subscriptions map[string]struct{} // 记录已订阅 topic } func NewMQTTService(broker, clientID, username, password string, keepalive int) *MQTTService { opts := mqtt.NewClientOptions() opts.AddBroker(broker) opts.SetClientID(clientID) opts.SetUsername(username) opts.SetPassword(password) opts.AutoReconnect = true opts.OnReconnecting = func(c mqtt.Client, opts *mqtt.ClientOptions) { log.Println("正在重连到物联网服务器...") } opts.SetKeepAlive(time.Duration(keepalive) * time.Second) opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) // 设置遗嘱消息 lwtTopic, lwtPayload := getStatusInfoTopicPayload(false) opts.SetWill( lwtTopic, // topic lwtPayload, // payload 1, // qos false, // retained ) ms := &MQTTService{subscriptions: make(map[string]struct{})} opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { if ms.handler != nil { ms.handler(m.Topic(), m.Payload()) } }) opts.OnConnect = func(c mqtt.Client) { log.Println("物联网服务器已连接") cfg := config.LoadConfig() cfg.ControlState = "CONNECTED" cfg.LastAliveAt = time.Now().Unix() config.WriteLocalConfig(cfg) // 连接成功 发送状态信息 topic, payload := getStatusInfoTopicPayload(true) ms.PublicMsg(topic, payload, false) } opts.OnConnectionLost = func(c mqtt.Client, err error) { log.Println("物联网服务器已断开:", err) cfg := config.LoadConfig() cfg.ControlState = "DEGRADED" // 表示短暂断开 cfg.LastAliveAt = time.Now().Unix() // 更新这个时间戳,让 daemon 与 主程序看到尚可忍耐 if err := config.WriteLocalConfig(cfg); err != nil { log.Println("[main] 写状态失败:", err) } } ms.client = mqtt.NewClient(opts) return ms } func (m *MQTTService) Connect() error { token := m.client.Connect() if token.Wait() && token.Error() != nil { return token.Error() } return nil } func (m *MQTTService) Subscribe(topic string, qos byte) error { token := m.client.Subscribe(topic, qos, nil) if token.Wait() && token.Error() != nil { return token.Error() } log.Debug("物联网服务消息订阅:", topic) return nil } func (m *MQTTService) Publish(topic string, qos byte, retained bool, payload interface{}) error { token := m.client.Publish(topic, qos, retained, payload) token.Wait() return token.Error() } func (m *MQTTService) SetMessageHandler(h func(topic string, payload []byte)) { m.handler = h } func (m *MQTTService) Close() { if m.client != nil { m.client.Disconnect(250) } } func (m *MQTTService) UnsubscribeTopic(topic string) error { token := m.client.Unsubscribe(topic) go func() { if token.Wait() && token.Error() != nil { log.Println("取消订阅失败:", token.Error()) } else { delete(m.subscriptions, topic) log.Debug("取消订阅成功:", topic) } }() return nil } // UnsubscribeAll 取消所有已订阅 topic func (m *MQTTService) UnsubscribeAll() { for topic := range m.subscriptions { _ = m.client.Unsubscribe(topic) delete(m.subscriptions, topic) } } // SubscribeTopic 订阅指定 topic,并记录可取消 func (m *MQTTService) SubscribeTopic(topic string, qos byte) error { token := m.client.Subscribe(topic, qos, nil) go func() { if token.Wait() && token.Error() != nil { log.Println("订阅失败:", token.Error()) } else { m.subscriptions[topic] = struct{}{} log.Debug("订阅成功:", topic) } }() return nil } func (m *MQTTService) PublicMsg(topic string, payload string, retained bool) { qos := byte(1) log.Debug("发送消息:", topic) if err := m.Publish(topic, qos, retained, payload); err != nil { log.Println("发送信息出错:", err) } else { log.Debug("发送信息:", string(payload)) } }