package main import ( "encoding/json" "os" "os/exec" "sentinel/pkg/config" "sentinel/pkg/log" model2 "sentinel/pkg/model" "sentinel/pkg/utils" "strconv" "time" ) type BusinessService struct { mqtt *MQTTService deviceID string deptId string cmdTopic string deviceType string subscriptions map[string]struct{} // 记录已订阅 topic } func NewBusinessService(m *MQTTService, deviceID string) *BusinessService { return &BusinessService{ mqtt: m, deviceID: deviceID, subscriptions: make(map[string]struct{}), } } // SubscribeTopic 订阅指定 topic,并记录可取消 func (b *BusinessService) SubscribeTopic(topic string, qos byte) error { if err := b.mqtt.Subscribe(topic, qos); err != nil { return err } b.subscriptions[topic] = struct{}{} return nil } func getInitTopic(deviceID string) string { return "+/+/+/" + deviceID + "/#" } func (b *BusinessService) getOwnTopic(deviceID string) string { return b.deptId + "/cmd/" + b.deviceType + "/" + deviceID + "/#" } func (b *BusinessService) Start() error { if err := b.SubscribeTopic(getInitTopic(b.deviceID), 1); err != nil { return err } b.mqtt.SetMessageHandler(b.onMQTTMessage) // 第一次连接就发送状态信息 b.SendStatusInfo() return nil } // UnsubscribeTopic 取消订阅指定 topic func (b *BusinessService) UnsubscribeTopic(topic string) error { token := b.mqtt.client.Unsubscribe(topic) if token.Wait() && token.Error() != nil { return token.Error() } delete(b.subscriptions, topic) return nil } // UnsubscribeAll 取消所有已订阅 topic func (b *BusinessService) UnsubscribeAll() { for topic := range b.subscriptions { _ = b.mqtt.client.Unsubscribe(topic) delete(b.subscriptions, topic) } } // 消息处理 func (b *BusinessService) onMQTTMessage(topic string, payload []byte) { model := model2.FromStringToMqttTopic(topic) // 指令 if model.Domain == "cmd" && model.DeviceType == b.deviceType { log.Println("收到指令:", model.Resource) switch model.Resource { case "ping": log.Println("pong") case "shutdown": b.handleShutdown() case "restart": b.handleRestart() case "check_update": b.handleCheckUpdate() default: log.Println("未知的命令:", model.Resource) } } else if model.Domain == "status" && model.Resource == "receipt" { b.deviceType = model.DeviceType b.deptId = model.DeptId // 取消订阅之前的初始化主题 if b.UnsubscribeTopic(getInitTopic(b.deviceID)) != nil { log.Error("无法取消初始化主题") return } // 新订阅属于自己的主题 if b.SubscribeTopic(b.getOwnTopic(b.deviceID), 1) != nil { log.Error("无法定于属于自己的主题") return } log.Println("设备初始化成功:所属项目:", model.DeptId, "\t设备类型:", model.DeviceType) } } func (b *BusinessService) SendStatusInfo() { info := map[string]interface{}{ "version": config.APP_VERSION, "online": true, "ip": utils.GetLocalIP(), "hostname": utils.GetHostname(), "mac": utils.GetMacAddress(), "os": utils.GetOSInfo(), "cpu": utils.GetCPUInfo(), "memory_total": utils.GetMemory(), "disk_total": utils.GetDisk(), "last_seen": time.Now().UTC().Format(time.RFC3339), } payload, _ := json.Marshal(info) topic := "x/status/x/" + b.deviceID + "/info" qos := byte(1) retained := true log.Println("发送消息:", topic) if err := b.mqtt.Publish(topic, qos, retained, payload); err != nil { log.Println("发送状态信息出错:", err) } else { log.Println("发送状态信息:", string(payload)) } } // 关闭程序(立即退出) func (b *BusinessService) handleShutdown() { os.Exit(0) } // 重启程序 func (b *BusinessService) handleRestart() { exe, _ := os.Executable() cmd := exec.Command(exe) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr _ = cmd.Start() os.Exit(0) } func (b *BusinessService) handleCheckUpdate() { args := []string{ "--version", strconv.Itoa(config.APP_VERSION), } launcher := newUpdaterLauncher() if err := launcher.Start(args); err != nil { log.Println("[BUS] failed to start updater:", err) return } log.Println( "[BUS] updater started, exiting main program", ) time.Sleep(500 * time.Millisecond) os.Exit(0) } // handleCheckUpdate 触发更新流程(主程序侧) //func (b *BusinessService) handleCheckUpdate() { // // args := []string{ // "--version", strconv.Itoa(config.APP_VERSION), // } // // cmd := exec.Command("./updater.exe", args...) // cmd.Stdout = os.Stdout // cmd.Stderr = os.Stderr // // // OS 级脱离父进程 // switch runtime.GOOS { // case "windows": // cmd.SysProcAttr = &syscall.SysProcAttr{ // CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP, // } // } // // if err := cmd.Start(); err != nil { // log.Println("[BUS] failed to start updater:", err) // return // } // // log.Println( // "[BUS] updater started (pid=%d), exiting main program\n", // cmd.Process.Pid, // ) // // // 给 updater 留出启动窗口(尤其是 systemd / docker 环境) // time.Sleep(500 * time.Millisecond) // // os.Exit(0) //}