import asyncio import json import os import socket import ssl import sys import uuid from datetime import timezone from aiomqtt import Client, Will import utils from config.redis import redis_client from db.postgres import get_dept_id_by_iot_user_name, get_device_type_by_iot_user_name from models.MqttTopic import MqttTopic from routers.Iot import pending_commands from routers.WS import ws_manager # ================= 配置区域 ================= MQTT_BROKER = "ai.ronsunny.cn" MQTT_PORT = 8093 MQTT_PASSWORD = "123456" TLS_CONTEXT = ssl.create_default_context() # 默认连接后要订阅的 topic 配置 DEFAULT_SUBSCRIPTIONS = [ # 状态信息回执 MqttTopic.from_parts( dept_id=None, domain="status", device_type=None, device_id=None, resource="info", ), # 其他信息回执(指令) MqttTopic.from_parts( dept_id=None, domain="receipt", device_type=None, device_id=None, resource="info", ), ] # =========================================== DEVICE_ID = None MQTT_CLIENT: Client | None = None # 全局客户端 # Windows 平台下切换到 SelectorEventLoop if sys.platform.lower() == "win32" or os.name.lower() == "nt": from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy set_event_loop_policy(WindowsSelectorEventLoopPolicy()) def get_device_id_simple(): hostname = os.getenv("HOST_NAME") if not hostname: hostname = socket.gethostname() mac = uuid.getnode() mac_str = ":".join(f"{(mac >> ele) & 0xff:02x}" for ele in range(40, -1, -8)) return f"{hostname}|{mac_str}" else: return hostname # ------------------ MQTT 封装 ------------------ async def mqtt_publish( dept_id: str, domain: str, device_type: str, device_id: str, resource: str, payload: str, qos: int = 1, ): """发布消息(使用全局客户端)""" if not MQTT_CLIENT: raise RuntimeError("MQTT client is not initialized") topic = f"{dept_id}/{domain}/{device_type}/{device_id}/{resource}" await MQTT_CLIENT.publish(topic, payload, qos=qos) print(f"Published to {topic}: {payload}") async def mqtt_publish_multiple( targets: list[dict], resource: str, payload: str, qos: int = 1 ): """群发消息""" for target in targets: await mqtt_publish( domain=target["domain"], device_type=target["device_type"], device_id=target["device_id"], resource=resource, payload=payload, qos=qos, ) async def _mqtt_handle_messages(): """后台循环处理消息""" if not MQTT_CLIENT: return async for message in MQTT_CLIENT.messages: topic = MqttTopic(message.topic) print("收到消息:" + str(topic)) payload = json.loads(message.payload.decode()) # 处理基础状态信息 if topic.is_status(): # 这里收到的数据是这样的:"x/status/x/deviceID/info" redis_client.set_device_info(topic.device_id, payload) dept_id = get_dept_id_by_iot_user_name(topic.device_id) dept_edge = get_device_type_by_iot_user_name(topic.device_id) payload["dept_id"] = dept_id payload_str = json.dumps(payload, ensure_ascii=False) # 返回给设备 await mqtt_publish( dept_id, topic.domain, dept_edge, topic.device_id, "receipt", payload_str, ) print("设备" + topic.device_id + "变化:" + str(payload["online"])) # 通知vue更新在线状态 await ws_manager.noticeOnlineStatus( { "deviceId": topic.device_id, "online": payload["online"], "type": "status", } ) # 如果是设备回复 elif topic.is_response() and "request_id" in payload: req_id = payload["request_id"] future = pending_commands.get(req_id) if future and not future.done(): future.set_result(payload) # 唤醒等待的 HTTP 接口 async def mqtt_client_async(): global DEVICE_ID, MQTT_CLIENT DEVICE_ID = get_device_id_simple() print("服务端EMQX账号:", DEVICE_ID) lwt_topic, lwt_payload = get_status_info_topic_payload(False) will = Will( topic=lwt_topic, payload=lwt_payload, qos=1, # 通常 LWT 用 QoS 1 retain=False, ) async with Client( MQTT_BROKER, port=MQTT_PORT, username=DEVICE_ID, password=MQTT_PASSWORD, tls_context=TLS_CONTEXT, identifier=DEVICE_ID, will=will, ) as client: MQTT_CLIENT = client # 保存全局客户端 client.on_connect = on_connect client.on_disconnect = on_disconnect print("MQTT client connected") # 订阅默认 topic for topic in DEFAULT_SUBSCRIPTIONS: await MQTT_CLIENT.subscribe(topic.to_topic()) print(f"Subscribed to default topic: {topic.to_topic()}") # 发送基础消息:"x/status/x/deviceID/info" await public_device_status() # 启动消息处理循环 await _mqtt_handle_messages() def on_connect(client, flags, rc, properties): print("MQTT connected") def on_disconnect(client, packet, exc=None): print("MQTT disconnected:", exc) INITIAL_RECONNECT_INTERVAL = 5 MAX_RECONNECT_INTERVAL = 60 async def mqtt_client_runner(): global MQTT_CLIENT reconnect_interval = INITIAL_RECONNECT_INTERVAL while True: try: await mqtt_client_async() # 如果 mqtt_client_async 正常返回,说明是主动退出 reconnect_interval = INITIAL_RECONNECT_INTERVAL except Exception as e: print("MQTT 连接异常:", e) print(f"等待 {reconnect_interval}s 后重连...") await asyncio.sleep(reconnect_interval) # ⭐ 指数退避放在这里 reconnect_interval = min(reconnect_interval * 2, MAX_RECONNECT_INTERVAL) def get_status_info_topic_payload(is_online: bool): info = { "version": "1.0.0", # 替换成你的 APP 版本 "online": is_online, "ip": utils.get_local_ip(), "hostname": socket.gethostname(), "mac": utils.get_mac_address(), "os": utils.platform.platform(), "cpu": utils.get_cpu_info(), "memory_total": utils.get_memory_total(), "disk_total": utils.get_disk_total(), "last_seen": utils.datetime.now(timezone.utc).isoformat(), } topic = "x/status/x/" + get_device_id_simple() + "/info" payload = json.dumps(info) return topic, payload async def public_device_status(): _, payload = get_status_info_topic_payload(True) await mqtt_publish( "x", "status", "server", get_device_id_simple(), "info", payload, ) # ------------------ 示例主程序 ------------------ # async def main(): # await mqtt_client_async() # # # 示例:发布消息 # await mqtt_publish("status", "edge", DEVICE_ID, "heartbeat", '{"alive":true}') # # # 示例:群发 # targets = [ # {"domain": "cmd", "device_type": "edge", "device_id": "edge01"}, # {"domain": "cmd", "device_type": "edge", "device_id": "edge02"}, # ] # await mqtt_publish_multiple(targets, "restart", '{"action":"restart"}')