74 lines
1.7 KiB
Go
74 lines
1.7 KiB
Go
package main
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"sentinel/pkg/log"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
type MQTTService struct {
|
|
client mqtt.Client
|
|
handler func(topic string, payload []byte)
|
|
}
|
|
|
|
func NewMQTTService(broker, clientID, username, password string, keepalive int) *MQTTService {
|
|
opts := mqtt.NewClientOptions()
|
|
opts.AddBroker(broker)
|
|
opts.SetClientID(clientID)
|
|
opts.SetUsername(username)
|
|
opts.SetPassword(password)
|
|
opts.SetKeepAlive(time.Duration(keepalive) * time.Second)
|
|
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
|
|
|
|
ms := &MQTTService{}
|
|
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
|
|
if ms.handler != nil {
|
|
ms.handler(m.Topic(), m.Payload())
|
|
}
|
|
})
|
|
opts.OnConnect = func(c mqtt.Client) {
|
|
log.Println("物联网服务已连接")
|
|
}
|
|
opts.OnConnectionLost = func(c mqtt.Client, err error) {
|
|
log.Println("物联网服务已断开:", err)
|
|
}
|
|
|
|
ms.client = mqtt.NewClient(opts)
|
|
return ms
|
|
}
|
|
|
|
func (m *MQTTService) Connect() error {
|
|
token := m.client.Connect()
|
|
if token.Wait() && token.Error() != nil {
|
|
return token.Error()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *MQTTService) Subscribe(topic string, qos byte) error {
|
|
token := m.client.Subscribe(topic, qos, nil)
|
|
if token.Wait() && token.Error() != nil {
|
|
return token.Error()
|
|
}
|
|
log.Println("物联网服务消息订阅:", topic)
|
|
return nil
|
|
}
|
|
|
|
func (m *MQTTService) Publish(topic string, qos byte, retained bool, payload interface{}) error {
|
|
token := m.client.Publish(topic, qos, retained, payload)
|
|
token.Wait()
|
|
return token.Error()
|
|
}
|
|
|
|
func (m *MQTTService) SetMessageHandler(h func(topic string, payload []byte)) {
|
|
m.handler = h
|
|
}
|
|
|
|
func (m *MQTTService) Close() {
|
|
if m.client != nil {
|
|
m.client.Disconnect(250)
|
|
}
|
|
}
|