package main import ( "context" "encoding/json" "fmt" "sentinel/pkg/config" "sentinel/pkg/device" "sentinel/pkg/docker" "sentinel/pkg/log" model2 "sentinel/pkg/model" "sentinel/pkg/utils" "time" ) type BusinessService struct { mqtt *MQTTService dockerManager docker.DockerManager deviceID string deptId string cmdTopic string deviceType string } func NewBusinessService(m *MQTTService, deviceID string, dm docker.DockerManager) *BusinessService { return &BusinessService{ mqtt: m, dockerManager: dm, deviceID: deviceID, } } func (b *BusinessService) Start() error { if err := b.mqtt.SubscribeTopic(getInitTopic(b.deviceID), 1); err != nil { return err } b.mqtt.SetMessageHandler(b.onMQTTMessage) return nil } func getInitTopic(deviceID string) string { return "+/+/+/" + deviceID + "/#" } func (b *BusinessService) getOwnTopic(deviceID string) string { return b.deptId + "/cmd/" + b.deviceType + "/" + deviceID + "/#" } // 消息处理 func (b *BusinessService) onMQTTMessage(topic string, payload []byte) { model := model2.FromStringToMqttTopic(topic) payload_json, err := utils.PayloadToMap(payload) if err != nil { fmt.Println("解析失败:", err) return } // 指令 if model.Domain == "cmd" && model.DeviceType == b.deviceType { log.Println("收到指令:", model.Resource) switch model.Resource { case "shutdown": payload_json["massage"] = b.handleShutdown() case "restart": payload_json["massage"] = b.handleRestart() case "check_update": payload_json["massage"] = b.handleCheckUpdate() default: log.Println("未知的命令:", model.Resource) payload_json["massage"] = "未知的命令" } topic := "x/receipt/x/" + device.GetDeviceID() + "/info" payload := utils.JSONToString(payload_json) b.mqtt.PublicMsg(topic, payload, false) // 回执 } else if model.Domain == "status" && model.Resource == "receipt" { b.deviceType = model.DeviceType b.deptId = model.DeptId // 取消订阅之前的初始化主题 if b.mqtt.UnsubscribeTopic(getInitTopic(b.deviceID)) != nil { log.Error("无法取消初始化主题") return } // 新订阅属于自己的主题 if b.mqtt.SubscribeTopic(b.getOwnTopic(b.deviceID), 1) != nil { log.Error("无法定于属于自己的主题") return } log.Println("设备初始化成功:所属项目:", model.DeptId, "\t设备类型:", model.DeviceType) } } func getStatusInfoTopicPayload(isOnline bool) (string, string) { info := map[string]interface{}{ "version": config.APP_VERSION, "online": isOnline, "ip": utils.GetLocalIP(), "hostname": utils.GetHostname(), "mac": utils.GetMacAddress(), "os": utils.GetOSInfo(), "cpu": utils.GetCPUInfo(), "memory_total": utils.GetMemory(), "disk_total": utils.GetDisk(), "last_seen": time.Now().UTC().Format(time.RFC3339), } payloadBytes, _ := json.Marshal(info) payload := string(payloadBytes) // 转成 string return "x/status/x/" + device.GetDeviceID() + "/info", payload } // 关闭程序(立即退出) func (b *BusinessService) handleShutdown() string { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel() b.dockerManager.StopAndRemoveContainer(ctx) return "程序已退出" } // 重启程序 func (b *BusinessService) handleRestart() string { log.Println("正在拉取镜像:", config.DOCKER_IMAGE) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel() if err := b.dockerManager.PullImage(ctx, config.DOCKER_IMAGE); err != nil { log.Fatalf("镜像拉取失败:%v", err) } err := b.dockerManager.RunContainer(ctx) if err != nil { log.Fatal(err) } return "程序已重启" } func (b *BusinessService) handleCheckUpdate() string { cfg := config.LoadConfig() cfg.NeedUpdate = true if err := config.WriteLocalConfig(&cfg); err != nil { log.Println("load config failed:", err) } return "程序更新机制已触发" } //func (b *BusinessService) handleCheckUpdate() { // args := []string{ // "--version", strconv.Itoa(config.APP_VERSION), // } // // launcher := newUpdaterLauncher() // // if err := launcher.Start(args); err != nil { // log.Println("[BUS] failed to start updater:", err) // return // } // // log.Println( // "[BUS] updater started, exiting main program", // ) // // time.Sleep(500 * time.Millisecond) // os.Exit(0) //}