package main import ( "crypto/tls" "sentinel/pkg/log" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) type MQTTService struct { client mqtt.Client handler func(topic string, payload []byte) } func NewMQTTService(broker, clientID, username, password string, keepalive int) *MQTTService { opts := mqtt.NewClientOptions() opts.AddBroker(broker) opts.SetClientID(clientID) opts.SetUsername(username) opts.SetPassword(password) opts.SetKeepAlive(time.Duration(keepalive) * time.Second) opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) ms := &MQTTService{} opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { if ms.handler != nil { ms.handler(m.Topic(), m.Payload()) } }) opts.OnConnect = func(c mqtt.Client) { log.Println("物联网服务已连接") } opts.OnConnectionLost = func(c mqtt.Client, err error) { log.Println("物联网服务已断开:", err) } ms.client = mqtt.NewClient(opts) return ms } func (m *MQTTService) Connect() error { token := m.client.Connect() if token.Wait() && token.Error() != nil { return token.Error() } return nil } func (m *MQTTService) Subscribe(topic string, qos byte) error { token := m.client.Subscribe(topic, qos, nil) if token.Wait() && token.Error() != nil { return token.Error() } log.Println("物联网服务消息订阅:", topic) return nil } func (m *MQTTService) Publish(topic string, qos byte, retained bool, payload interface{}) error { token := m.client.Publish(topic, qos, retained, payload) token.Wait() return token.Error() } func (m *MQTTService) SetMessageHandler(h func(topic string, payload []byte)) { m.handler = h } func (m *MQTTService) Close() { if m.client != nil { m.client.Disconnect(250) } }