192 lines
4.6 KiB
Go
192 lines
4.6 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"os"
|
|
"os/exec"
|
|
"runtime"
|
|
"sentinel/pkg/config"
|
|
"sentinel/pkg/log"
|
|
model2 "sentinel/pkg/model"
|
|
"sentinel/pkg/utils"
|
|
"strconv"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
type BusinessService struct {
|
|
mqtt *MQTTService
|
|
deviceID string
|
|
deptId string
|
|
cmdTopic string
|
|
deviceType string
|
|
subscriptions map[string]struct{} // 记录已订阅 topic
|
|
}
|
|
|
|
func NewBusinessService(m *MQTTService, deviceID string) *BusinessService {
|
|
return &BusinessService{
|
|
mqtt: m,
|
|
deviceID: deviceID,
|
|
subscriptions: make(map[string]struct{}),
|
|
}
|
|
}
|
|
|
|
// SubscribeTopic 订阅指定 topic,并记录可取消
|
|
func (b *BusinessService) SubscribeTopic(topic string, qos byte) error {
|
|
if err := b.mqtt.Subscribe(topic, qos); err != nil {
|
|
return err
|
|
}
|
|
b.subscriptions[topic] = struct{}{}
|
|
return nil
|
|
}
|
|
|
|
func getInitTopic(deviceID string) string {
|
|
return "+/+/+/" + deviceID + "/#"
|
|
}
|
|
func (b *BusinessService) getOwnTopic(deviceID string) string {
|
|
return b.deptId + "/cmd/" + b.deviceType + "/" + deviceID + "/#"
|
|
}
|
|
|
|
func (b *BusinessService) Start() error {
|
|
if err := b.SubscribeTopic(getInitTopic(b.deviceID), 1); err != nil {
|
|
return err
|
|
}
|
|
|
|
b.mqtt.SetMessageHandler(b.onMQTTMessage)
|
|
|
|
// 第一次连接就发送状态信息
|
|
b.SendStatusInfo()
|
|
|
|
return nil
|
|
}
|
|
|
|
// UnsubscribeTopic 取消订阅指定 topic
|
|
func (b *BusinessService) UnsubscribeTopic(topic string) error {
|
|
token := b.mqtt.client.Unsubscribe(topic)
|
|
if token.Wait() && token.Error() != nil {
|
|
return token.Error()
|
|
}
|
|
delete(b.subscriptions, topic)
|
|
return nil
|
|
}
|
|
|
|
// UnsubscribeAll 取消所有已订阅 topic
|
|
func (b *BusinessService) UnsubscribeAll() {
|
|
for topic := range b.subscriptions {
|
|
_ = b.mqtt.client.Unsubscribe(topic)
|
|
delete(b.subscriptions, topic)
|
|
}
|
|
}
|
|
|
|
// 消息处理
|
|
func (b *BusinessService) onMQTTMessage(topic string, payload []byte) {
|
|
model := model2.FromStringToMqttTopic(topic)
|
|
// 指令
|
|
if model.Domain == "cmd" && model.DeviceType == b.deviceType {
|
|
log.Println("收到指令:", model.Resource)
|
|
switch model.Resource {
|
|
case "ping":
|
|
log.Println("pong")
|
|
case "shutdown":
|
|
b.handleShutdown()
|
|
case "restart":
|
|
b.handleRestart()
|
|
case "check_update":
|
|
b.handleCheckUpdate()
|
|
default:
|
|
log.Println("未知的命令:", model.Resource)
|
|
}
|
|
} else if model.Domain == "status" && model.Resource == "receipt" {
|
|
b.deviceType = model.DeviceType
|
|
b.deptId = model.DeptId
|
|
// 取消订阅之前的初始化主题
|
|
if b.UnsubscribeTopic(getInitTopic(b.deviceID)) != nil {
|
|
log.Error("无法取消初始化主题")
|
|
return
|
|
}
|
|
// 新订阅属于自己的主题
|
|
if b.SubscribeTopic(b.getOwnTopic(b.deviceID), 1) != nil {
|
|
log.Error("无法定于属于自己的主题")
|
|
return
|
|
}
|
|
log.Println("设备初始化成功:所属项目:", model.DeptId, "\t设备类型:", model.DeviceType)
|
|
}
|
|
}
|
|
|
|
func (b *BusinessService) SendStatusInfo() {
|
|
info := map[string]interface{}{
|
|
"version": config.APP_VERSION,
|
|
"online": true,
|
|
"ip": utils.GetLocalIP(),
|
|
"hostname": utils.GetHostname(),
|
|
"mac": utils.GetMacAddress(),
|
|
"os": utils.GetOSInfo(),
|
|
"cpu": utils.GetCPUInfo(),
|
|
"memory_total": utils.GetMemory(),
|
|
"disk_total": utils.GetDisk(),
|
|
"last_seen": time.Now().UTC().Format(time.RFC3339),
|
|
}
|
|
|
|
payload, _ := json.Marshal(info)
|
|
topic := "x/status/x/" + b.deviceID + "/info"
|
|
qos := byte(1)
|
|
retained := true
|
|
log.Println("发送消息:", topic)
|
|
|
|
if err := b.mqtt.Publish(topic, qos, retained, payload); err != nil {
|
|
log.Println("发送状态信息出错:", err)
|
|
} else {
|
|
log.Println("发送状态信息:", string(payload))
|
|
}
|
|
}
|
|
|
|
// 关闭程序(立即退出)
|
|
func (b *BusinessService) handleShutdown() {
|
|
os.Exit(0)
|
|
}
|
|
|
|
// 重启程序
|
|
func (b *BusinessService) handleRestart() {
|
|
exe, _ := os.Executable()
|
|
cmd := exec.Command(exe)
|
|
cmd.Stdout = os.Stdout
|
|
cmd.Stderr = os.Stderr
|
|
_ = cmd.Start()
|
|
os.Exit(0)
|
|
}
|
|
|
|
// handleCheckUpdate 触发更新流程(主程序侧)
|
|
func (b *BusinessService) handleCheckUpdate() {
|
|
|
|
args := []string{
|
|
"--version", strconv.Itoa(config.APP_VERSION),
|
|
}
|
|
|
|
cmd := exec.Command("./updater.exe", args...)
|
|
cmd.Stdout = os.Stdout
|
|
cmd.Stderr = os.Stderr
|
|
|
|
// OS 级脱离父进程
|
|
switch runtime.GOOS {
|
|
case "windows":
|
|
cmd.SysProcAttr = &syscall.SysProcAttr{
|
|
CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP,
|
|
}
|
|
}
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
log.Println("[BUS] failed to start updater:", err)
|
|
return
|
|
}
|
|
|
|
log.Println(
|
|
"[BUS] updater started (pid=%d), exiting main program\n",
|
|
cmd.Process.Pid,
|
|
)
|
|
|
|
// 给 updater 留出启动窗口(尤其是 systemd / docker 环境)
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
os.Exit(0)
|
|
}
|