物联网软件
This commit is contained in:
@@ -1,109 +1,89 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"os/exec"
|
||||
"fmt"
|
||||
"sentinel/pkg/config"
|
||||
"sentinel/pkg/device"
|
||||
"sentinel/pkg/docker"
|
||||
"sentinel/pkg/log"
|
||||
model2 "sentinel/pkg/model"
|
||||
"sentinel/pkg/utils"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type BusinessService struct {
|
||||
mqtt *MQTTService
|
||||
dockerManager docker.DockerManager
|
||||
deviceID string
|
||||
deptId string
|
||||
cmdTopic string
|
||||
deviceType string
|
||||
subscriptions map[string]struct{} // 记录已订阅 topic
|
||||
}
|
||||
|
||||
func NewBusinessService(m *MQTTService, deviceID string) *BusinessService {
|
||||
func NewBusinessService(m *MQTTService, deviceID string, dm docker.DockerManager) *BusinessService {
|
||||
return &BusinessService{
|
||||
mqtt: m,
|
||||
dockerManager: dm,
|
||||
deviceID: deviceID,
|
||||
subscriptions: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeTopic 订阅指定 topic,并记录可取消
|
||||
func (b *BusinessService) SubscribeTopic(topic string, qos byte) error {
|
||||
if err := b.mqtt.Subscribe(topic, qos); err != nil {
|
||||
func (b *BusinessService) Start() error {
|
||||
if err := b.mqtt.SubscribeTopic(getInitTopic(b.deviceID), 1); err != nil {
|
||||
return err
|
||||
}
|
||||
b.subscriptions[topic] = struct{}{}
|
||||
|
||||
b.mqtt.SetMessageHandler(b.onMQTTMessage)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getInitTopic(deviceID string) string {
|
||||
return "+/+/+/" + deviceID + "/#"
|
||||
}
|
||||
|
||||
func (b *BusinessService) getOwnTopic(deviceID string) string {
|
||||
return b.deptId + "/cmd/" + b.deviceType + "/" + deviceID + "/#"
|
||||
}
|
||||
|
||||
func (b *BusinessService) Start() error {
|
||||
if err := b.SubscribeTopic(getInitTopic(b.deviceID), 1); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.mqtt.SetMessageHandler(b.onMQTTMessage)
|
||||
|
||||
// 第一次连接就发送状态信息
|
||||
b.SendStatusInfo()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnsubscribeTopic 取消订阅指定 topic
|
||||
func (b *BusinessService) UnsubscribeTopic(topic string) error {
|
||||
token := b.mqtt.client.Unsubscribe(topic)
|
||||
if token.Wait() && token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
delete(b.subscriptions, topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnsubscribeAll 取消所有已订阅 topic
|
||||
func (b *BusinessService) UnsubscribeAll() {
|
||||
for topic := range b.subscriptions {
|
||||
_ = b.mqtt.client.Unsubscribe(topic)
|
||||
delete(b.subscriptions, topic)
|
||||
}
|
||||
}
|
||||
|
||||
// 消息处理
|
||||
func (b *BusinessService) onMQTTMessage(topic string, payload []byte) {
|
||||
model := model2.FromStringToMqttTopic(topic)
|
||||
payload_json, err := utils.PayloadToMap(payload)
|
||||
if err != nil {
|
||||
fmt.Println("解析失败:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 指令
|
||||
if model.Domain == "cmd" && model.DeviceType == b.deviceType {
|
||||
log.Println("收到指令:", model.Resource)
|
||||
switch model.Resource {
|
||||
case "ping":
|
||||
log.Println("pong")
|
||||
case "shutdown":
|
||||
b.handleShutdown()
|
||||
payload_json["massage"] = b.handleShutdown()
|
||||
case "restart":
|
||||
b.handleRestart()
|
||||
payload_json["massage"] = b.handleRestart()
|
||||
case "check_update":
|
||||
b.handleCheckUpdate()
|
||||
payload_json["massage"] = b.handleCheckUpdate()
|
||||
default:
|
||||
log.Println("未知的命令:", model.Resource)
|
||||
payload_json["massage"] = "未知的命令"
|
||||
}
|
||||
topic := "x/receipt/x/" + device.GetDeviceID() + "/info"
|
||||
payload := utils.JSONToString(payload_json)
|
||||
b.mqtt.PublicMsg(topic, payload, false) // 回执
|
||||
} else if model.Domain == "status" && model.Resource == "receipt" {
|
||||
b.deviceType = model.DeviceType
|
||||
b.deptId = model.DeptId
|
||||
// 取消订阅之前的初始化主题
|
||||
if b.UnsubscribeTopic(getInitTopic(b.deviceID)) != nil {
|
||||
if b.mqtt.UnsubscribeTopic(getInitTopic(b.deviceID)) != nil {
|
||||
log.Error("无法取消初始化主题")
|
||||
return
|
||||
}
|
||||
// 新订阅属于自己的主题
|
||||
if b.SubscribeTopic(b.getOwnTopic(b.deviceID), 1) != nil {
|
||||
if b.mqtt.SubscribeTopic(b.getOwnTopic(b.deviceID), 1) != nil {
|
||||
log.Error("无法定于属于自己的主题")
|
||||
return
|
||||
}
|
||||
@@ -111,10 +91,10 @@ func (b *BusinessService) onMQTTMessage(topic string, payload []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BusinessService) SendStatusInfo() {
|
||||
func getStatusInfoTopicPayload(isOnline bool) (string, string) {
|
||||
info := map[string]interface{}{
|
||||
"version": config.APP_VERSION,
|
||||
"online": true,
|
||||
"online": isOnline,
|
||||
"ip": utils.GetLocalIP(),
|
||||
"hostname": utils.GetHostname(),
|
||||
"mac": utils.GetMacAddress(),
|
||||
@@ -124,86 +104,60 @@ func (b *BusinessService) SendStatusInfo() {
|
||||
"disk_total": utils.GetDisk(),
|
||||
"last_seen": time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
payload, _ := json.Marshal(info)
|
||||
topic := "x/status/x/" + b.deviceID + "/info"
|
||||
qos := byte(1)
|
||||
retained := true
|
||||
log.Println("发送消息:", topic)
|
||||
|
||||
if err := b.mqtt.Publish(topic, qos, retained, payload); err != nil {
|
||||
log.Println("发送状态信息出错:", err)
|
||||
} else {
|
||||
log.Println("发送状态信息:", string(payload))
|
||||
}
|
||||
payloadBytes, _ := json.Marshal(info)
|
||||
payload := string(payloadBytes) // 转成 string
|
||||
return "x/status/x/" + device.GetDeviceID() + "/info", payload
|
||||
}
|
||||
|
||||
// 关闭程序(立即退出)
|
||||
func (b *BusinessService) handleShutdown() {
|
||||
os.Exit(0)
|
||||
func (b *BusinessService) handleShutdown() string {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
b.dockerManager.StopAndRemoveContainer(ctx)
|
||||
return "程序已退出"
|
||||
}
|
||||
|
||||
// 重启程序
|
||||
func (b *BusinessService) handleRestart() {
|
||||
exe, _ := os.Executable()
|
||||
cmd := exec.Command(exe)
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
_ = cmd.Start()
|
||||
os.Exit(0)
|
||||
}
|
||||
func (b *BusinessService) handleCheckUpdate() {
|
||||
|
||||
args := []string{
|
||||
"--version", strconv.Itoa(config.APP_VERSION),
|
||||
func (b *BusinessService) handleRestart() string {
|
||||
log.Println("正在拉取镜像:", config.DOCKER_IMAGE)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
if err := b.dockerManager.PullImage(ctx, config.DOCKER_IMAGE); err != nil {
|
||||
log.Fatalf("镜像拉取失败:%v", err)
|
||||
}
|
||||
|
||||
launcher := newUpdaterLauncher()
|
||||
|
||||
if err := launcher.Start(args); err != nil {
|
||||
log.Println("[BUS] failed to start updater:", err)
|
||||
return
|
||||
err := b.dockerManager.RunContainer(ctx)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
log.Println(
|
||||
"[BUS] updater started, exiting main program",
|
||||
)
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
os.Exit(0)
|
||||
return "程序已重启"
|
||||
}
|
||||
|
||||
func (b *BusinessService) handleCheckUpdate() string {
|
||||
cfg := config.LoadConfig()
|
||||
cfg.NeedUpdate = true
|
||||
if err := config.WriteLocalConfig(&cfg); err != nil {
|
||||
log.Println("load config failed:", err)
|
||||
}
|
||||
return "程序更新机制已触发"
|
||||
}
|
||||
|
||||
// handleCheckUpdate 触发更新流程(主程序侧)
|
||||
//func (b *BusinessService) handleCheckUpdate() {
|
||||
//
|
||||
// args := []string{
|
||||
// "--version", strconv.Itoa(config.APP_VERSION),
|
||||
// }
|
||||
//
|
||||
// cmd := exec.Command("./updater.exe", args...)
|
||||
// cmd.Stdout = os.Stdout
|
||||
// cmd.Stderr = os.Stderr
|
||||
// launcher := newUpdaterLauncher()
|
||||
//
|
||||
// // OS 级脱离父进程
|
||||
// switch runtime.GOOS {
|
||||
// case "windows":
|
||||
// cmd.SysProcAttr = &syscall.SysProcAttr{
|
||||
// CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP,
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if err := cmd.Start(); err != nil {
|
||||
// if err := launcher.Start(args); err != nil {
|
||||
// log.Println("[BUS] failed to start updater:", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// log.Println(
|
||||
// "[BUS] updater started (pid=%d), exiting main program\n",
|
||||
// cmd.Process.Pid,
|
||||
// "[BUS] updater started, exiting main program",
|
||||
// )
|
||||
//
|
||||
// // 给 updater 留出启动窗口(尤其是 systemd / docker 环境)
|
||||
// time.Sleep(500 * time.Millisecond)
|
||||
//
|
||||
// os.Exit(0)
|
||||
//}
|
||||
|
||||
+80
-22
@@ -1,40 +1,69 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"sentinel/pkg/config"
|
||||
"sentinel/pkg/device"
|
||||
"sentinel/pkg/docker"
|
||||
"sentinel/pkg/log"
|
||||
)
|
||||
|
||||
func main() {
|
||||
banner := `
|
||||
==========================================================================
|
||||
|
||||
██████ █████ █████ ██████ ██ █████ ██████ ██ ██ ██████
|
||||
██ ██ ██ ██ ██ ████ ██ ██ ███ ██ ██
|
||||
█████ ██ ██ ██ ██ █████ ██ ██ ██ ██ █████ ████ ██ ██
|
||||
██ ██ ██ ██ ██ ██ ██████ ██ ██ ██ ██ ████ ██
|
||||
██████ █████ █████ ██████ ██ ██ █████ ██████ ██ ██ ██
|
||||
|
||||
==========================================================================
|
||||
`
|
||||
fmt.Println(banner)
|
||||
deviceID := device.GetDeviceID()
|
||||
log.Init(config.Log_file_dic) // 初始化日志目录
|
||||
log.Info("Device id: " + deviceID) // 第一次启动记录
|
||||
log.Println("版本号: ", config.APP_VERSION) // 第一次启动记录
|
||||
|
||||
// APP Logo
|
||||
InitPrint(deviceID)
|
||||
// Config
|
||||
InitConfigFile()
|
||||
// MQTT
|
||||
InitMQTT(deviceID)
|
||||
|
||||
}
|
||||
func InitConfigFile() {
|
||||
// 第一次写入默认配置
|
||||
cfg := config.AppConfig{
|
||||
VersionCode: config.APP_VERSION,
|
||||
PID: os.Getpid(),
|
||||
NeedUpdate: false,
|
||||
ControlState: "DEGRADED",
|
||||
LastAliveAt: time.Now().Unix(),
|
||||
}
|
||||
|
||||
// 写入文件
|
||||
if err := config.WriteLocalConfig(&cfg); err != nil {
|
||||
log.Println("[config] 写入初始配置失败:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func InitPrint(deviceID string) {
|
||||
banner := `
|
||||
==============================================================================
|
||||
|
||||
██████ █████ ██████ ██████ ██ ██████ ██████ ██ ██ ██████
|
||||
██ ██ ██ ██ ██ ████ ██ ██ ███ ██ ██
|
||||
█████ ██ ██ ██ ██ █████ ██ ██ ██ ██ █████ ████ ██ ██
|
||||
██ ██ ██ ██ ██ ██ ██████ ██ ██ ██ ██ ████ ██
|
||||
██████ █████ █████ ██████ ██ ██ █████ ██████ ██ ██ ██
|
||||
|
||||
==============================================================================
|
||||
`
|
||||
log.Println(banner)
|
||||
log.Println("Device id: "+deviceID, "\t版本号: ", config.APP_VERSION) // 第一次启动记录
|
||||
}
|
||||
|
||||
func InitMQTT(deviceID string) {
|
||||
var mqttSvc *MQTTService
|
||||
firstFail := true // 标记是否第一次失败
|
||||
mqttSvc = NewMQTTService(config.MQTT_BROKER, deviceID, deviceID, config.PASSWORD, 60)
|
||||
for {
|
||||
mqttSvc = NewMQTTService(config.MQTT_BROKER, deviceID, deviceID, config.PASSWORD, 60)
|
||||
err := mqttSvc.Connect()
|
||||
if err != nil {
|
||||
if firstFail {
|
||||
log.Error("物联网服务连接失败,如未注册设备,请先注册: " + deviceID)
|
||||
log.Error("物联网服务器连接失败,如未注册设备,请先注册: " + deviceID)
|
||||
firstFail = false
|
||||
}
|
||||
time.Sleep(3 * time.Second) // 5秒后重试
|
||||
@@ -44,7 +73,13 @@ func main() {
|
||||
}
|
||||
defer mqttSvc.Close()
|
||||
|
||||
biz := NewBusinessService(mqttSvc, deviceID)
|
||||
dm, err := docker.NewDockerManager()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
biz := NewBusinessService(mqttSvc, deviceID, *dm)
|
||||
|
||||
for {
|
||||
// MQTT业务
|
||||
err := biz.Start()
|
||||
@@ -56,9 +91,32 @@ func main() {
|
||||
}
|
||||
break
|
||||
}
|
||||
// 第一次运行直接启动
|
||||
biz.handleRestart()
|
||||
|
||||
// 主线程循环,可做心跳或状态上报
|
||||
for {
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
// 4️⃣ T2 超时自毁检查
|
||||
tickerLost := time.NewTicker(10 * time.Second)
|
||||
defer tickerLost.Stop()
|
||||
go func() {
|
||||
for range tickerLost.C {
|
||||
cfg := config.LoadConfig()
|
||||
now := time.Now().Unix()
|
||||
if cfg.ControlState == "DEGRADED" && now-cfg.LastAliveAt > config.DAEMON_ALIVE_GAP_SECONDS {
|
||||
log.Println("[main] MQTT 长期失联,进入 CONTROL_LOST")
|
||||
cfg.ControlState = "CONTROL_LOST"
|
||||
cfg.LastAliveAt = time.Now().Unix()
|
||||
if err := config.WriteLocalConfig(cfg); err != nil {
|
||||
log.Println("[main] 写状态失败:", err)
|
||||
}
|
||||
// 停止业务容器
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
dm.StopAndRemoveContainer(ctx)
|
||||
// 自杀退出,让 daemon 重启
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
<-make(chan struct{})
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"sentinel/pkg/config"
|
||||
"sentinel/pkg/log"
|
||||
"time"
|
||||
|
||||
@@ -9,8 +10,9 @@ import (
|
||||
)
|
||||
|
||||
type MQTTService struct {
|
||||
client mqtt.Client
|
||||
handler func(topic string, payload []byte)
|
||||
client mqtt.Client
|
||||
handler func(topic string, payload []byte)
|
||||
subscriptions map[string]struct{} // 记录已订阅 topic
|
||||
}
|
||||
|
||||
func NewMQTTService(broker, clientID, username, password string, keepalive int) *MQTTService {
|
||||
@@ -19,20 +21,45 @@ func NewMQTTService(broker, clientID, username, password string, keepalive int)
|
||||
opts.SetClientID(clientID)
|
||||
opts.SetUsername(username)
|
||||
opts.SetPassword(password)
|
||||
opts.AutoReconnect = true
|
||||
opts.OnReconnecting = func(c mqtt.Client, opts *mqtt.ClientOptions) {
|
||||
log.Println("正在重连到物联网服务器...")
|
||||
}
|
||||
opts.SetKeepAlive(time.Duration(keepalive) * time.Second)
|
||||
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
|
||||
ms := &MQTTService{}
|
||||
// 设置遗嘱消息
|
||||
lwtTopic, lwtPayload := getStatusInfoTopicPayload(false)
|
||||
opts.SetWill(
|
||||
lwtTopic, // topic
|
||||
lwtPayload, // payload
|
||||
1, // qos
|
||||
false, // retained
|
||||
)
|
||||
ms := &MQTTService{subscriptions: make(map[string]struct{})}
|
||||
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
|
||||
if ms.handler != nil {
|
||||
ms.handler(m.Topic(), m.Payload())
|
||||
}
|
||||
})
|
||||
opts.OnConnect = func(c mqtt.Client) {
|
||||
log.Println("物联网服务已连接")
|
||||
log.Println("物联网服务器已连接")
|
||||
cfg := config.LoadConfig()
|
||||
cfg.ControlState = "CONNECTED"
|
||||
cfg.LastAliveAt = time.Now().Unix()
|
||||
config.WriteLocalConfig(cfg)
|
||||
// 连接成功 发送状态信息
|
||||
topic, payload := getStatusInfoTopicPayload(true)
|
||||
ms.PublicMsg(topic, payload, false)
|
||||
}
|
||||
opts.OnConnectionLost = func(c mqtt.Client, err error) {
|
||||
log.Println("物联网服务已断开:", err)
|
||||
log.Println("物联网服务器已断开:", err)
|
||||
cfg := config.LoadConfig()
|
||||
cfg.ControlState = "DEGRADED" // 表示短暂断开
|
||||
cfg.LastAliveAt = time.Now().Unix() // 更新这个时间戳,让 daemon 与 主程序看到尚可忍耐
|
||||
|
||||
if err := config.WriteLocalConfig(cfg); err != nil {
|
||||
log.Println("[main] 写状态失败:", err)
|
||||
}
|
||||
}
|
||||
|
||||
ms.client = mqtt.NewClient(opts)
|
||||
@@ -52,7 +79,7 @@ func (m *MQTTService) Subscribe(topic string, qos byte) error {
|
||||
if token.Wait() && token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
log.Println("物联网服务消息订阅:", topic)
|
||||
log.Debug("物联网服务消息订阅:", topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -71,3 +98,48 @@ func (m *MQTTService) Close() {
|
||||
m.client.Disconnect(250)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MQTTService) UnsubscribeTopic(topic string) error {
|
||||
token := m.client.Unsubscribe(topic)
|
||||
go func() {
|
||||
if token.Wait() && token.Error() != nil {
|
||||
log.Println("取消订阅失败:", token.Error())
|
||||
} else {
|
||||
delete(m.subscriptions, topic)
|
||||
log.Debug("取消订阅成功:", topic)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnsubscribeAll 取消所有已订阅 topic
|
||||
func (m *MQTTService) UnsubscribeAll() {
|
||||
for topic := range m.subscriptions {
|
||||
_ = m.client.Unsubscribe(topic)
|
||||
delete(m.subscriptions, topic)
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeTopic 订阅指定 topic,并记录可取消
|
||||
func (m *MQTTService) SubscribeTopic(topic string, qos byte) error {
|
||||
token := m.client.Subscribe(topic, qos, nil)
|
||||
go func() {
|
||||
if token.Wait() && token.Error() != nil {
|
||||
log.Println("订阅失败:", token.Error())
|
||||
} else {
|
||||
m.subscriptions[topic] = struct{}{}
|
||||
log.Debug("订阅成功:", topic)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MQTTService) PublicMsg(topic string, payload string, retained bool) {
|
||||
qos := byte(1)
|
||||
log.Debug("发送消息:", topic)
|
||||
if err := m.Publish(topic, qos, retained, payload); err != nil {
|
||||
log.Println("发送信息出错:", err)
|
||||
} else {
|
||||
log.Debug("发送信息:", string(payload))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user