Files
AILab/bbit_ai/app/config/emqx.py
T
2026-02-04 13:58:18 +08:00

258 lines
7.5 KiB
Python

import asyncio
import json
import os
import socket
import ssl
import sys
import uuid
from datetime import timezone
from aiomqtt import Client, Will
import utils
from config.redis import redis_client
from db.postgres import get_dept_id_by_iot_user_name, get_device_type_by_iot_user_name
from models.MqttTopic import MqttTopic
from routers.Iot import pending_commands
from routers.WS import ws_manager
# ================= 配置区域 =================
MQTT_BROKER = "ai.ronsunny.cn"
MQTT_PORT = 8093
MQTT_PASSWORD = "123456"
TLS_CONTEXT = ssl.create_default_context()
# 默认连接后要订阅的 topic 配置
DEFAULT_SUBSCRIPTIONS = [
# 状态信息回执
MqttTopic.from_parts(
dept_id=None,
domain="status",
device_type=None,
device_id=None,
resource="info",
),
# 其他信息回执(指令)
MqttTopic.from_parts(
dept_id=None,
domain="receipt",
device_type=None,
device_id=None,
resource="info",
),
]
# ===========================================
DEVICE_ID = None
MQTT_CLIENT: Client | None = None # 全局客户端
# Windows 平台下切换到 SelectorEventLoop
if sys.platform.lower() == "win32" or os.name.lower() == "nt":
from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
set_event_loop_policy(WindowsSelectorEventLoopPolicy())
def get_device_id_simple():
hostname = os.getenv("HOST_NAME")
if not hostname:
hostname = socket.gethostname()
mac = uuid.getnode()
mac_str = ":".join(f"{(mac >> ele) & 0xff:02x}" for ele in range(40, -1, -8))
return f"{hostname}|{mac_str}"
else:
return hostname
# ------------------ MQTT 封装 ------------------
async def mqtt_publish(
dept_id: str,
domain: str,
device_type: str,
device_id: str,
resource: str,
payload: str,
qos: int = 1,
):
"""发布消息(使用全局客户端)"""
if not MQTT_CLIENT:
raise RuntimeError("MQTT client is not initialized")
topic = f"{dept_id}/{domain}/{device_type}/{device_id}/{resource}"
await MQTT_CLIENT.publish(topic, payload, qos=qos)
print(f"Published to {topic}: {payload}")
async def mqtt_publish_multiple(
targets: list[dict], resource: str, payload: str, qos: int = 1
):
"""群发消息"""
for target in targets:
await mqtt_publish(
domain=target["domain"],
device_type=target["device_type"],
device_id=target["device_id"],
resource=resource,
payload=payload,
qos=qos,
)
async def _mqtt_handle_messages():
"""后台循环处理消息"""
if not MQTT_CLIENT:
return
async for message in MQTT_CLIENT.messages:
topic = MqttTopic(message.topic)
print("收到消息:" + str(topic))
payload = json.loads(message.payload.decode())
# 处理基础状态信息
if topic.is_status():
# 这里收到的数据是这样的:"x/status/x/deviceID/info"
redis_client.set_device_info(topic.device_id, payload)
dept_id = get_dept_id_by_iot_user_name(topic.device_id)
dept_edge = get_device_type_by_iot_user_name(topic.device_id)
payload["dept_id"] = dept_id
payload_str = json.dumps(payload, ensure_ascii=False)
# 返回给设备
await mqtt_publish(
dept_id,
topic.domain,
dept_edge,
topic.device_id,
"receipt",
payload_str,
)
print("设备" + topic.device_id + "变化:" + str(payload["online"]))
# 通知vue更新在线状态
await ws_manager.noticeOnlineStatus(
{
"deviceId": topic.device_id,
"online": payload["online"],
"type": "status",
}
)
# 如果是设备回复
elif topic.is_response() and "request_id" in payload:
req_id = payload["request_id"]
future = pending_commands.get(req_id)
if future and not future.done():
future.set_result(payload) # 唤醒等待的 HTTP 接口
async def mqtt_client_async():
global DEVICE_ID, MQTT_CLIENT
DEVICE_ID = get_device_id_simple()
print("服务端EMQX账号:", DEVICE_ID)
lwt_topic, lwt_payload = get_status_info_topic_payload(False)
will = Will(
topic=lwt_topic,
payload=lwt_payload,
qos=1, # 通常 LWT 用 QoS 1
retain=False,
)
async with Client(
MQTT_BROKER,
port=MQTT_PORT,
username=DEVICE_ID,
password=MQTT_PASSWORD,
tls_context=TLS_CONTEXT,
identifier=DEVICE_ID,
will=will,
) as client:
MQTT_CLIENT = client # 保存全局客户端
client.on_connect = on_connect
client.on_disconnect = on_disconnect
print("MQTT client connected")
# 订阅默认 topic
for topic in DEFAULT_SUBSCRIPTIONS:
await MQTT_CLIENT.subscribe(topic.to_topic())
print(f"Subscribed to default topic: {topic.to_topic()}")
# 发送基础消息:"x/status/x/deviceID/info"
await public_device_status()
# 启动消息处理循环
await _mqtt_handle_messages()
def on_connect(client, flags, rc, properties):
print("MQTT connected")
def on_disconnect(client, packet, exc=None):
print("MQTT disconnected:", exc)
INITIAL_RECONNECT_INTERVAL = 5
MAX_RECONNECT_INTERVAL = 60
async def mqtt_client_runner():
global MQTT_CLIENT
reconnect_interval = INITIAL_RECONNECT_INTERVAL
while True:
try:
await mqtt_client_async()
# 如果 mqtt_client_async 正常返回,说明是主动退出
reconnect_interval = INITIAL_RECONNECT_INTERVAL
except Exception as e:
print("MQTT 连接异常:", e)
print(f"等待 {reconnect_interval}s 后重连...")
await asyncio.sleep(reconnect_interval)
# ⭐ 指数退避放在这里
reconnect_interval = min(reconnect_interval * 2, MAX_RECONNECT_INTERVAL)
def get_status_info_topic_payload(is_online: bool):
info = {
"version": "1.0.0", # 替换成你的 APP 版本
"online": is_online,
"ip": utils.get_local_ip(),
"hostname": socket.gethostname(),
"mac": utils.get_mac_address(),
"os": utils.platform.platform(),
"cpu": utils.get_cpu_info(),
"memory_total": utils.get_memory_total(),
"disk_total": utils.get_disk_total(),
"last_seen": utils.datetime.now(timezone.utc).isoformat(),
}
topic = "x/status/x/" + get_device_id_simple() + "/info"
payload = json.dumps(info)
return topic, payload
async def public_device_status():
_, payload = get_status_info_topic_payload(True)
await mqtt_publish(
"x",
"status",
"server",
get_device_id_simple(),
"info",
payload,
)
# ------------------ 示例主程序 ------------------
# async def main():
# await mqtt_client_async()
#
# # 示例:发布消息
# await mqtt_publish("status", "edge", DEVICE_ID, "heartbeat", '{"alive":true}')
#
# # 示例:群发
# targets = [
# {"domain": "cmd", "device_type": "edge", "device_id": "edge01"},
# {"domain": "cmd", "device_type": "edge", "device_id": "edge02"},
# ]
# await mqtt_publish_multiple(targets, "restart", '{"action":"restart"}')