完善牧安云哨-中间件
This commit is contained in:
@@ -0,0 +1,131 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"sentinel/pkg/log"
|
||||
model2 "sentinel/pkg/model"
|
||||
"sentinel/pkg/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
type BusinessService struct {
|
||||
mqtt *MQTTService
|
||||
deviceID string
|
||||
project string
|
||||
cmdTopic string
|
||||
deviceType string
|
||||
}
|
||||
|
||||
func NewBusinessService(m *MQTTService, project, deviceType, deviceID string) *BusinessService {
|
||||
// 根据统一规则生成 topic
|
||||
cmdTopic := project + "/cmd/" + deviceType + "/" + deviceID + "/#"
|
||||
return &BusinessService{
|
||||
mqtt: m,
|
||||
project: project,
|
||||
deviceID: deviceID,
|
||||
cmdTopic: cmdTopic,
|
||||
deviceType: deviceType,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BusinessService) Start() error {
|
||||
// 订阅 cmd topic
|
||||
if err := b.mqtt.Subscribe(b.cmdTopic, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
b.mqtt.SetMessageHandler(b.onMQTTMessage)
|
||||
// 第一次连接就发送状态信息
|
||||
b.SendStatusInfo()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 消息处理
|
||||
func (b *BusinessService) onMQTTMessage(topic string, payload []byte) {
|
||||
model := model2.FromStringToMqttTopic(topic)
|
||||
// 是指令
|
||||
if model.Domain == "cmd" {
|
||||
log.Println("收到指令:", model.Resource)
|
||||
switch model.Resource {
|
||||
case "ping":
|
||||
log.Println("pong")
|
||||
case "shutdown":
|
||||
b.handleShutdown()
|
||||
case "restart":
|
||||
b.handleRestart()
|
||||
case "check_update":
|
||||
b.handleCheckUpdate()
|
||||
default:
|
||||
log.Println("未知的命令:", model.Resource)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BusinessService) SendStatusInfo() {
|
||||
info := map[string]interface{}{
|
||||
"project": utils.PROJECT,
|
||||
"deviceType": utils.DEVICE_TPYE,
|
||||
"version": utils.APP_VERSION,
|
||||
"online": true,
|
||||
"ip": utils.GetLocalIP(),
|
||||
"hostname": utils.GetHostname(),
|
||||
"mac": utils.GetMacAddress(),
|
||||
"os": utils.GetOSInfo(),
|
||||
"cpu": utils.GetCPUInfo(),
|
||||
"memory_total": utils.GetMemory(),
|
||||
"disk_total": utils.GetDisk(),
|
||||
"last_seen": time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
payload, _ := json.Marshal(info)
|
||||
topic := b.project + "/status/" + b.deviceType + "/" + b.deviceID + "/info"
|
||||
qos := byte(1)
|
||||
retained := true
|
||||
|
||||
if err := b.mqtt.Publish(topic, qos, retained, payload); err != nil {
|
||||
log.Println("[BUS] failed to send status info:", err)
|
||||
} else {
|
||||
log.Println("[BUS] status info sent:", string(payload))
|
||||
}
|
||||
}
|
||||
|
||||
// 关闭程序(立即退出)
|
||||
func (b *BusinessService) handleShutdown() {
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
// 重启程序
|
||||
func (b *BusinessService) handleRestart() {
|
||||
exe, _ := os.Executable()
|
||||
cmd := exec.Command(exe)
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
_ = cmd.Start()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
// 更新程序
|
||||
func (b *BusinessService) handleCheckUpdate() {
|
||||
exe, _ := os.Executable()
|
||||
updaterPath := filepath.Join(filepath.Dir(exe), "updater")
|
||||
if _, err := os.Stat(updaterPath); os.IsNotExist(err) {
|
||||
if _, err2 := os.Stat(updaterPath + ".exe"); err2 == nil {
|
||||
updaterPath = updaterPath + ".exe"
|
||||
} else {
|
||||
log.Println("[BUS] updater not found")
|
||||
return
|
||||
}
|
||||
}
|
||||
cmd := exec.Command(updaterPath, "--target", exe)
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
if err := cmd.Start(); err != nil {
|
||||
log.Println("[BUS] failed to start updater:", err)
|
||||
return
|
||||
}
|
||||
log.Println("[BUS] exiting main program for update")
|
||||
os.Exit(0)
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sentinel/pkg/device"
|
||||
"sentinel/pkg/log"
|
||||
"sentinel/pkg/model"
|
||||
api "sentinel/pkg/net"
|
||||
"sentinel/pkg/storage"
|
||||
)
|
||||
|
||||
func test() {
|
||||
// 0. 从海康SDK获取图片以及信息
|
||||
record := loadData()
|
||||
// 1. 上传图片到OSS
|
||||
uploadFile(record.LicensePlateImage, record.VehicleImage)
|
||||
// 2. 调用分析请求
|
||||
analytics(record)
|
||||
}
|
||||
|
||||
func loadData() model.Record {
|
||||
return model.Record{
|
||||
DeviceId: device.GetDeviceID(),
|
||||
LicensePlate: "晋A-888888",
|
||||
LicensePlateImage: "licensePlateImage_test1.jpg",
|
||||
VehicleType: "大型货车",
|
||||
VehicleImage: "vehicleImage_test1.jpg",
|
||||
}
|
||||
}
|
||||
|
||||
func uploadFile(licensePlateImage string, vehicleImage string) {
|
||||
if err := storage.Init(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// todo 需要压缩图片至1~3MB
|
||||
size, err := storage.UploadFile(
|
||||
context.Background(),
|
||||
"sentinel",
|
||||
"license_plate/"+licensePlateImage,
|
||||
"tmp/"+licensePlateImage,
|
||||
"image/jpeg",
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Println(fmt.Sprintf("车牌照已上传完毕, 大小: = %d KB", size/1024))
|
||||
size, err = storage.UploadFile(
|
||||
context.Background(),
|
||||
"sentinel",
|
||||
"vehicle_image/"+vehicleImage,
|
||||
"tmp/"+vehicleImage,
|
||||
"image/jpeg",
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Println(fmt.Sprintf("车身照已上传完毕, 大小: = %d KB", size/1024))
|
||||
}
|
||||
|
||||
func analytics(record model.Record) {
|
||||
err := api.Analytics(record)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sentinel/pkg/utils"
|
||||
"time"
|
||||
|
||||
"sentinel/pkg/device"
|
||||
"sentinel/pkg/log"
|
||||
)
|
||||
|
||||
func main() {
|
||||
deviceID := device.GetDeviceID()
|
||||
log.Init(utils.Log_file_dic) // 初始化日志目录
|
||||
log.Info("Device id: " + deviceID) // 第一次启动记录
|
||||
|
||||
broker := fmt.Sprintf("tls://%s:%d", utils.MQTT_HOST, utils.MQTT_PORT)
|
||||
username := deviceID
|
||||
password := utils.PASSWORD
|
||||
|
||||
var mqttSvc *MQTTService
|
||||
firstFail := true // 标记是否第一次失败
|
||||
for {
|
||||
mqttSvc = NewMQTTService(broker, username, username, password, 60)
|
||||
err := mqttSvc.Connect()
|
||||
if err != nil {
|
||||
if firstFail {
|
||||
log.Error("物联网服务连接失败,请先注册设备. DeviceID: " + deviceID + " ")
|
||||
firstFail = false
|
||||
}
|
||||
time.Sleep(5 * time.Second) // 5秒后重试
|
||||
continue
|
||||
}
|
||||
log.Info("物联网服务已启动")
|
||||
break
|
||||
}
|
||||
defer mqttSvc.Close()
|
||||
|
||||
biz := NewBusinessService(mqttSvc, utils.PROJECT, utils.DEVICE_TPYE, deviceID)
|
||||
for {
|
||||
// MQTT业务
|
||||
err := biz.Start()
|
||||
if err != nil {
|
||||
log.Error("business service start failed: " + err.Error())
|
||||
fmt.Println("业务启动失败,5秒后重试...")
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
// 个人业务
|
||||
test()
|
||||
break
|
||||
}
|
||||
|
||||
// 主线程循环,可做心跳或状态上报
|
||||
for {
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"sentinel/pkg/log"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
)
|
||||
|
||||
type MQTTService struct {
|
||||
client mqtt.Client
|
||||
handler func(topic string, payload []byte)
|
||||
}
|
||||
|
||||
func NewMQTTService(broker, clientID, username, password string, keepalive int) *MQTTService {
|
||||
opts := mqtt.NewClientOptions()
|
||||
opts.AddBroker(broker)
|
||||
opts.SetClientID(clientID)
|
||||
opts.SetUsername(username)
|
||||
opts.SetPassword(password)
|
||||
opts.SetKeepAlive(time.Duration(keepalive) * time.Second)
|
||||
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
|
||||
ms := &MQTTService{}
|
||||
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
|
||||
if ms.handler != nil {
|
||||
ms.handler(m.Topic(), m.Payload())
|
||||
}
|
||||
})
|
||||
opts.OnConnect = func(c mqtt.Client) {
|
||||
log.Println("物联网服务已连接")
|
||||
}
|
||||
opts.OnConnectionLost = func(c mqtt.Client, err error) {
|
||||
log.Println("物联网服务已断开:", err)
|
||||
}
|
||||
|
||||
ms.client = mqtt.NewClient(opts)
|
||||
return ms
|
||||
}
|
||||
|
||||
func (m *MQTTService) Connect() error {
|
||||
token := m.client.Connect()
|
||||
if token.Wait() && token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MQTTService) Subscribe(topic string, qos byte) error {
|
||||
token := m.client.Subscribe(topic, qos, nil)
|
||||
if token.Wait() && token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
log.Println("物联网服务消息订阅:", topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MQTTService) Publish(topic string, qos byte, retained bool, payload interface{}) error {
|
||||
token := m.client.Publish(topic, qos, retained, payload)
|
||||
token.Wait()
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
func (m *MQTTService) SetMessageHandler(h func(topic string, payload []byte)) {
|
||||
m.handler = h
|
||||
}
|
||||
|
||||
func (m *MQTTService) Close() {
|
||||
if m.client != nil {
|
||||
m.client.Disconnect(250)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user