import json import os import socket import ssl import sys import uuid from aiomqtt import Client from config.redis import redis_client from models.MqttTopic import MqttTopic # ================= 配置区域 ================= MQTT_BROKER = "ai.ronsunny.cn" MQTT_PORT = 8093 MQTT_PASSWORD = "123456" TLS_CONTEXT = ssl.create_default_context() # 默认连接后要订阅的 topic 配置 DEFAULT_SUBSCRIPTIONS = [ MqttTopic.from_parts( project=None, domain="status", device_type="edge", device_id=None, resource="info", ) ] # =========================================== DEVICE_ID = None MQTT_CLIENT: Client | None = None # 全局客户端 # Windows 平台下切换到 SelectorEventLoop if sys.platform.lower() == "win32" or os.name.lower() == "nt": from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy set_event_loop_policy(WindowsSelectorEventLoopPolicy()) def get_device_id_simple(): try: with open("/etc/machine-id") as f: mid = f.read().strip() if mid: return mid except Exception: pass hostname = socket.gethostname() mac = uuid.getnode() mac_str = ":".join(f"{(mac >> ele) & 0xff:02x}" for ele in range(40, -1, -8)) return f"{hostname}|{mac_str}" # todo 这里需要订阅状态信息 设备发送信息 这里回复 vue前端发送指令 后端发送指令 设备接收指令 # ------------------ MQTT 封装 ------------------ async def mqtt_publish( project: str, domain: str, device_type: str, device_id: str, resource: str, payload: str, qos: int = 1, ): """发布消息(使用全局客户端)""" if not MQTT_CLIENT: raise RuntimeError("MQTT client is not initialized") topic = f"{project}/{domain}/{device_type}/{device_id}/{resource}" await MQTT_CLIENT.publish(topic, payload, qos=qos) print(f"Published to {topic}: {payload}") async def mqtt_publish_multiple( targets: list[dict], resource: str, payload: str, qos: int = 1 ): """群发消息""" for target in targets: await mqtt_publish( domain=target["domain"], device_type=target["device_type"], device_id=target["device_id"], resource=resource, payload=payload, qos=qos, ) async def _mqtt_handle_messages(): """后台循环处理消息""" if not MQTT_CLIENT: return async for message in MQTT_CLIENT.messages: topic = MqttTopic(message.topic) print("收到消息:" + str(topic)) # 处理基础状态信息 if topic.domain == "status" and topic.resource == "info": payload = json.loads(message.payload.decode()) redis_client.set_device_info(topic.device_id, payload) async def mqtt_client_async(): global DEVICE_ID, MQTT_CLIENT DEVICE_ID = get_device_id_simple() print("服务端EMQX账号:", DEVICE_ID) async with Client( MQTT_BROKER, port=MQTT_PORT, username=DEVICE_ID, password=MQTT_PASSWORD, tls_context=TLS_CONTEXT, identifier=DEVICE_ID, ) as client: MQTT_CLIENT = client # 保存全局客户端 print("MQTT client connected") # 订阅默认 topic for topic in DEFAULT_SUBSCRIPTIONS: await MQTT_CLIENT.subscribe(topic.to_topic()) print(f"Subscribed to default topic: {topic.to_topic()}") # 启动消息处理循环 await _mqtt_handle_messages() # ------------------ 示例主程序 ------------------ # async def main(): # await mqtt_client_async() # # # 示例:发布消息 # await mqtt_publish("status", "edge", DEVICE_ID, "heartbeat", '{"alive":true}') # # # 示例:群发 # targets = [ # {"domain": "cmd", "device_type": "edge", "device_id": "edge01"}, # {"domain": "cmd", "device_type": "edge", "device_id": "edge02"}, # ] # await mqtt_publish_multiple(targets, "restart", '{"action":"restart"}')