Files
AILab/bbit_ai/app/config/emqx.py
T
2025-12-31 17:49:17 +08:00

156 lines
4.5 KiB
Python

import json
import os
import socket
import ssl
import sys
import uuid
from aiomqtt import Client
from config.redis import redis_client
from db.postgres import get_dept_id_by_iot_user_name, get_device_type_by_iot_user_name
from models.MqttTopic import MqttTopic
# ================= 配置区域 =================
MQTT_BROKER = "ai.ronsunny.cn"
MQTT_PORT = 8093
MQTT_PASSWORD = "123456"
TLS_CONTEXT = ssl.create_default_context()
# 默认连接后要订阅的 topic 配置
DEFAULT_SUBSCRIPTIONS = [
MqttTopic.from_parts(
dept_id=None,
domain="status",
device_type=None,
device_id=None,
resource="info",
)
]
# ===========================================
DEVICE_ID = None
MQTT_CLIENT: Client | None = None # 全局客户端
# Windows 平台下切换到 SelectorEventLoop
if sys.platform.lower() == "win32" or os.name.lower() == "nt":
from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
set_event_loop_policy(WindowsSelectorEventLoopPolicy())
def get_device_id_simple():
hostname = os.getenv("HOST_NAME")
if not hostname:
hostname = socket.gethostname()
mac = uuid.getnode()
mac_str = ":".join(f"{(mac >> ele) & 0xff:02x}" for ele in range(40, -1, -8))
return f"{hostname}|{mac_str}"
else:
return hostname
# todo 这里需要订阅状态信息 设备发送信息 这里回复 vue前端发送指令 后端发送指令 设备接收指令
# ------------------ MQTT 封装 ------------------
async def mqtt_publish(
dept_id: str,
domain: str,
device_type: str,
device_id: str,
resource: str,
payload: str,
qos: int = 1,
):
"""发布消息(使用全局客户端)"""
if not MQTT_CLIENT:
raise RuntimeError("MQTT client is not initialized")
topic = f"{dept_id}/{domain}/{device_type}/{device_id}/{resource}"
await MQTT_CLIENT.publish(topic, payload, qos=qos)
print(f"Published to {topic}: {payload}")
async def mqtt_publish_multiple(
targets: list[dict], resource: str, payload: str, qos: int = 1
):
"""群发消息"""
for target in targets:
await mqtt_publish(
domain=target["domain"],
device_type=target["device_type"],
device_id=target["device_id"],
resource=resource,
payload=payload,
qos=qos,
)
async def _mqtt_handle_messages():
"""后台循环处理消息"""
if not MQTT_CLIENT:
return
async for message in MQTT_CLIENT.messages:
topic = MqttTopic(message.topic)
print("收到消息:" + str(topic))
# 处理基础状态信息
if topic.is_status():
# 这里收到的数据是这样的:"x/status/x/deviceID/info"
payload = json.loads(message.payload.decode())
redis_client.set_device_info(topic.device_id, payload)
dept_id = get_dept_id_by_iot_user_name(topic.device_id)
dept_edge = get_device_type_by_iot_user_name(topic.device_id)
payload["dept_id"] = dept_id
payload_str = json.dumps(payload, ensure_ascii=False)
# 返回给设备
await mqtt_publish(
dept_id,
topic.domain,
dept_edge,
topic.device_id,
"receipt",
payload_str,
)
async def mqtt_client_async():
global DEVICE_ID, MQTT_CLIENT
DEVICE_ID = get_device_id_simple()
print("服务端EMQX账号:", DEVICE_ID)
async with Client(
MQTT_BROKER,
port=MQTT_PORT,
username=DEVICE_ID,
password=MQTT_PASSWORD,
tls_context=TLS_CONTEXT,
identifier=DEVICE_ID,
) as client:
MQTT_CLIENT = client # 保存全局客户端
print("MQTT client connected")
# 订阅默认 topic
for topic in DEFAULT_SUBSCRIPTIONS:
await MQTT_CLIENT.subscribe(topic.to_topic())
print(f"Subscribed to default topic: {topic.to_topic()}")
# 启动消息处理循环
await _mqtt_handle_messages()
# ------------------ 示例主程序 ------------------
# async def main():
# await mqtt_client_async()
#
# # 示例:发布消息
# await mqtt_publish("status", "edge", DEVICE_ID, "heartbeat", '{"alive":true}')
#
# # 示例:群发
# targets = [
# {"domain": "cmd", "device_type": "edge", "device_id": "edge01"},
# {"domain": "cmd", "device_type": "edge", "device_id": "edge02"},
# ]
# await mqtt_publish_multiple(targets, "restart", '{"action":"restart"}')