初始化项目
This commit is contained in:
@@ -0,0 +1,145 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"sentinel/pkg/config"
|
||||
"sentinel/pkg/log"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
)
|
||||
|
||||
type MQTTService struct {
|
||||
client mqtt.Client
|
||||
handler func(topic string, payload []byte)
|
||||
subscriptions map[string]struct{} // 记录已订阅 topic
|
||||
}
|
||||
|
||||
func NewMQTTService(broker, clientID, username, password string, keepalive int) *MQTTService {
|
||||
opts := mqtt.NewClientOptions()
|
||||
opts.AddBroker(broker)
|
||||
opts.SetClientID(clientID)
|
||||
opts.SetUsername(username)
|
||||
opts.SetPassword(password)
|
||||
opts.AutoReconnect = true
|
||||
opts.OnReconnecting = func(c mqtt.Client, opts *mqtt.ClientOptions) {
|
||||
log.Println("正在重连到物联网服务器...")
|
||||
}
|
||||
opts.SetKeepAlive(time.Duration(keepalive) * time.Second)
|
||||
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
// 设置遗嘱消息
|
||||
lwtTopic, lwtPayload := getStatusInfoTopicPayload(false)
|
||||
opts.SetWill(
|
||||
lwtTopic, // topic
|
||||
lwtPayload, // payload
|
||||
1, // qos
|
||||
false, // retained
|
||||
)
|
||||
ms := &MQTTService{subscriptions: make(map[string]struct{})}
|
||||
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
|
||||
if ms.handler != nil {
|
||||
ms.handler(m.Topic(), m.Payload())
|
||||
}
|
||||
})
|
||||
opts.OnConnect = func(c mqtt.Client) {
|
||||
log.Println("物联网服务器已连接")
|
||||
cfg := config.LoadConfig()
|
||||
cfg.ControlState = "CONNECTED"
|
||||
cfg.LastAliveAt = time.Now().Unix()
|
||||
config.WriteLocalConfig(cfg)
|
||||
// 连接成功 发送状态信息
|
||||
topic, payload := getStatusInfoTopicPayload(true)
|
||||
ms.PublicMsg(topic, payload, false)
|
||||
}
|
||||
opts.OnConnectionLost = func(c mqtt.Client, err error) {
|
||||
log.Println("物联网服务器已断开:", err)
|
||||
cfg := config.LoadConfig()
|
||||
cfg.ControlState = "DEGRADED" // 表示短暂断开
|
||||
cfg.LastAliveAt = time.Now().Unix() // 更新这个时间戳,让 daemon 与 主程序看到尚可忍耐
|
||||
|
||||
if err := config.WriteLocalConfig(cfg); err != nil {
|
||||
log.Println("[main] 写状态失败:", err)
|
||||
}
|
||||
}
|
||||
|
||||
ms.client = mqtt.NewClient(opts)
|
||||
return ms
|
||||
}
|
||||
|
||||
func (m *MQTTService) Connect() error {
|
||||
token := m.client.Connect()
|
||||
if token.Wait() && token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MQTTService) Subscribe(topic string, qos byte) error {
|
||||
token := m.client.Subscribe(topic, qos, nil)
|
||||
if token.Wait() && token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
log.Debug("物联网服务消息订阅:", topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MQTTService) Publish(topic string, qos byte, retained bool, payload interface{}) error {
|
||||
token := m.client.Publish(topic, qos, retained, payload)
|
||||
token.Wait()
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
func (m *MQTTService) SetMessageHandler(h func(topic string, payload []byte)) {
|
||||
m.handler = h
|
||||
}
|
||||
|
||||
func (m *MQTTService) Close() {
|
||||
if m.client != nil {
|
||||
m.client.Disconnect(250)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MQTTService) UnsubscribeTopic(topic string) error {
|
||||
token := m.client.Unsubscribe(topic)
|
||||
go func() {
|
||||
if token.Wait() && token.Error() != nil {
|
||||
log.Println("取消订阅失败:", token.Error())
|
||||
} else {
|
||||
delete(m.subscriptions, topic)
|
||||
log.Debug("取消订阅成功:", topic)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnsubscribeAll 取消所有已订阅 topic
|
||||
func (m *MQTTService) UnsubscribeAll() {
|
||||
for topic := range m.subscriptions {
|
||||
_ = m.client.Unsubscribe(topic)
|
||||
delete(m.subscriptions, topic)
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeTopic 订阅指定 topic,并记录可取消
|
||||
func (m *MQTTService) SubscribeTopic(topic string, qos byte) error {
|
||||
token := m.client.Subscribe(topic, qos, nil)
|
||||
go func() {
|
||||
if token.Wait() && token.Error() != nil {
|
||||
log.Println("订阅失败:", token.Error())
|
||||
} else {
|
||||
m.subscriptions[topic] = struct{}{}
|
||||
log.Debug("订阅成功:", topic)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MQTTService) PublicMsg(topic string, payload string, retained bool) {
|
||||
qos := byte(1)
|
||||
log.Debug("发送消息:", topic)
|
||||
if err := m.Publish(topic, qos, retained, payload); err != nil {
|
||||
log.Println("发送信息出错:", err)
|
||||
} else {
|
||||
log.Debug("发送信息:", string(payload))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user