From 3c1128b356f54022580bd215e129da6473301698 Mon Sep 17 00:00:00 2001 From: BBIT-Kai <2911862937@qq.com> Date: Tue, 30 Dec 2025 17:55:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E4=B8=AD=E9=97=B4=E4=BB=B6?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E6=9B=B4=E6=96=B0=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bbit_ai/app/config/emqx.py | 25 ++++++++--- bbit_ai/app/db/postgres/iot.py | 3 ++ bbit_ai/app/db/postgres/system.py | 14 ++++++- bbit_ai/app/models/IotDeviceCommandRequest.py | 2 +- bbit_ai/app/models/MqttTopic.py | 12 +++--- bbit_ai/app/routers/Iot.py | 41 ++++++++++++++----- bbit_ai/app/routers/WS.py | 2 + bbit_ai/app/service/RabbitMQ.py | 1 - 8 files changed, 74 insertions(+), 26 deletions(-) diff --git a/bbit_ai/app/config/emqx.py b/bbit_ai/app/config/emqx.py index f9efcdc..88f51b8 100644 --- a/bbit_ai/app/config/emqx.py +++ b/bbit_ai/app/config/emqx.py @@ -8,6 +8,7 @@ import uuid from aiomqtt import Client from config.redis import redis_client +from db.postgres import get_dept_id_by_iot_user_name, get_device_type_by_iot_user_name from models.MqttTopic import MqttTopic # ================= 配置区域 ================= @@ -19,9 +20,9 @@ TLS_CONTEXT = ssl.create_default_context() # 默认连接后要订阅的 topic 配置 DEFAULT_SUBSCRIPTIONS = [ MqttTopic.from_parts( - project=None, + dept_id=None, domain="status", - device_type="edge", + device_type=None, device_id=None, resource="info", ) @@ -57,7 +58,7 @@ def get_device_id_simple(): async def mqtt_publish( - project: str, + dept_id: str, domain: str, device_type: str, device_id: str, @@ -68,7 +69,7 @@ async def mqtt_publish( """发布消息(使用全局客户端)""" if not MQTT_CLIENT: raise RuntimeError("MQTT client is not initialized") - topic = f"{project}/{domain}/{device_type}/{device_id}/{resource}" + topic = f"{dept_id}/{domain}/{device_type}/{device_id}/{resource}" await MQTT_CLIENT.publish(topic, payload, qos=qos) print(f"Published to {topic}: {payload}") @@ -97,9 +98,23 @@ async def _mqtt_handle_messages(): print("收到消息:" + str(topic)) # 处理基础状态信息 - if topic.domain == "status" and topic.resource == "info": + if topic.is_status(): + # 这里收到的数据是这样的:"x/status/x/deviceID/info" payload = json.loads(message.payload.decode()) redis_client.set_device_info(topic.device_id, payload) + dept_id = get_dept_id_by_iot_user_name(topic.device_id) + dept_edge = get_device_type_by_iot_user_name(topic.device_id) + payload["dept_id"] = dept_id + payload_str = json.dumps(payload, ensure_ascii=False) + # 返回给设备 + await mqtt_publish( + dept_id, + topic.domain, + dept_edge, + topic.device_id, + "receipt", + payload_str, + ) async def mqtt_client_async(): diff --git a/bbit_ai/app/db/postgres/iot.py b/bbit_ai/app/db/postgres/iot.py index 55cbeca..0a4a0c8 100644 --- a/bbit_ai/app/db/postgres/iot.py +++ b/bbit_ai/app/db/postgres/iot.py @@ -78,6 +78,7 @@ def get_device_list_db_page( d.is_active, d.is_superuser, d.dept_id, + d.type, sd.name AS dept_name, d.created_at FROM iot_users d @@ -99,6 +100,7 @@ def get_device_list_db_page( is_active, is_superuser, dept_id, + type, dept_name, created_at, ) = r @@ -111,6 +113,7 @@ def get_device_list_db_page( "status": 1 if is_active else 0, "is_superuser": 1 if is_superuser else 0, "dept_id": dept_id, + "device_type": type, "dept_name": dept_name, "created_at": format_datetime(created_at), } diff --git a/bbit_ai/app/db/postgres/system.py b/bbit_ai/app/db/postgres/system.py index cdf0624..8392dd9 100644 --- a/bbit_ai/app/db/postgres/system.py +++ b/bbit_ai/app/db/postgres/system.py @@ -952,16 +952,26 @@ def get_dept_id_by_user_id(user_id: str) -> str: return str(dept_id) -def get_dept_id_by_iot_user_name(user_id: UUID) -> str: +def get_dept_id_by_iot_user_name(user_id: str) -> str: # 通过 iot_user_id 查找其所属的 dept_id with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("SELECT dept_id FROM iot_users WHERE name = %s", (user_id,)) dept_id = cursor.fetchone() - dept_id = dept_id[0] + dept_id = str(dept_id[0]) return dept_id +def get_device_type_by_iot_user_name(user_id: str) -> str: + # 通过 iot_user_id 查找其所属的 type + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT type FROM iot_users WHERE name = %s", (user_id,)) + type = cursor.fetchone() + type = str(type[0]) + return type + + from typing import List diff --git a/bbit_ai/app/models/IotDeviceCommandRequest.py b/bbit_ai/app/models/IotDeviceCommandRequest.py index c2d438f..3e4e4c2 100644 --- a/bbit_ai/app/models/IotDeviceCommandRequest.py +++ b/bbit_ai/app/models/IotDeviceCommandRequest.py @@ -4,5 +4,5 @@ from pydantic import BaseModel class IotDeviceCommandRequest(BaseModel): id: str | None = None command: str | None = None - project: str | None = None + dept_id: str | None = None device_type: str | None = None diff --git a/bbit_ai/app/models/MqttTopic.py b/bbit_ai/app/models/MqttTopic.py index 1e30c35..0b728f9 100644 --- a/bbit_ai/app/models/MqttTopic.py +++ b/bbit_ai/app/models/MqttTopic.py @@ -16,7 +16,7 @@ class MqttTopic: # 不足的层级用 None 补齐,避免属性缺失 parts += [None] * (self.LEVELS - len(parts)) - self.project: Optional[str] = parts[0] + self.dept_id: Optional[str] = parts[0] self.domain: Optional[str] = parts[1] self.device_type: Optional[str] = parts[2] self.device_id: Optional[str] = parts[3] @@ -25,7 +25,7 @@ class MqttTopic: @classmethod def from_parts( cls, - project: Optional[str] = None, + dept_id: Optional[str] = None, domain: Optional[str] = None, device_type: Optional[str] = None, device_id: Optional[str] = None, @@ -43,7 +43,7 @@ class MqttTopic: map( _v, [ - project, + dept_id, domain, device_type, device_id, @@ -65,7 +65,7 @@ class MqttTopic: map( _v, [ - self.project, + self.dept_id, self.domain, self.device_type, self.device_id, @@ -80,7 +80,7 @@ class MqttTopic: 用于 publish 场景 """ parts = [ - self.project, + self.dept_id, self.domain, self.device_type, self.device_id, @@ -101,7 +101,7 @@ class MqttTopic: return f"" def is_status(self) -> bool: - return self.domain == "status" + return self.domain == "status" and self.resource == "info" def is_cmd(self) -> bool: return self.domain == "cmd" diff --git a/bbit_ai/app/routers/Iot.py b/bbit_ai/app/routers/Iot.py index b553226..51924f4 100644 --- a/bbit_ai/app/routers/Iot.py +++ b/bbit_ai/app/routers/Iot.py @@ -1,3 +1,5 @@ +import asyncio +import pathlib import uuid from uuid import UUID @@ -26,21 +28,34 @@ async def emqx_webhook(data: EMQXWebhook): if event == "client.connected": redis_client.set_online(device_id) + # 这里刻意等1s 是因为设备连接后这里首先接到通知,但是状态信息设备来没来得及通过mqtt发送来,所以在此等待 + # 没有直接在mqtt发送来的消息中获取在线状态是因为 这里是通过emqx的webhooks通知的,两种通知方式不同,一方面防止其中一种逻辑失效,另一方面在mqtt消息接收中设置在线状态会存在滞后性,同时也需要设置遗嘱消息,较为 + await asyncio.sleep(1) + await ws_manager.noticeOnlineStatus( + { + "deviceId": device_id, + "online": True, + "type": "status", + } + ) - await ws_manager.noticeOnlineStatus({"deviceId": device_id, "online": True}) - - print(f"[ONLINE] {device_id}") + print(f"[新设备在线] {device_id}") elif event == "client.disconnected": redis_client.set_offline(device_id) + await ws_manager.noticeOnlineStatus( + { + "deviceId": device_id, + "online": False, + "type": "status", + } + ) - await ws_manager.noticeOnlineStatus({"deviceId": device_id, "online": False}) - - print(f"[OFFLINE] {device_id}") + print(f"[设备离线] {device_id}") else: # 其他事件直接忽略 - print(f"[IGNORE] {event}") + print(f"[其他事件] {event}") return {"ok": True} @@ -80,8 +95,6 @@ async def get_device_list( d["memory_total"] = info_json.get("memory_total", "") d["disk_total"] = info_json.get("disk_total", "") d["last_seen"] = info_json.get("last_seen", "") - d["project"] = info_json.get("project", "") - d["device_type"] = info_json.get("deviceType", "") return BaseResponse(data={"list": devices, "total": total}) @@ -214,10 +227,14 @@ async def delete_update( @iot_router.get("/common/update/getUploadUrl") def getUploadUrl( + filename: str | None = None, user_id: UUID = Depends(get_user_id_from_token), ): + if not user_id: + return {"error": "userId is required"} # 生成唯一文件名,避免覆盖 - object_name = f"{uuid.uuid4()}" + ext = pathlib.Path(filename).suffix if filename else "" # 获取文件后缀 + object_name = f"{uuid.uuid4()}{ext}" # 拼接到 UUID 后面 return BaseResponse( data={ "uploadUrl": get_upload_token("iot-update", object_name), @@ -231,6 +248,8 @@ def updateGetMaxCodeByDeptId( user_id: UUID = Depends(get_user_id_from_token), dept_id: str | None = None, ): + if not user_id: + return {"error": "userId is required"} # 生成唯一文件名,避免覆盖 return BaseResponse(data=getMaxCodeByDeptId(dept_id)) @@ -251,6 +270,6 @@ async def command( return {"error": "userId is required"} await mqtt_publish( - data.project, "cmd", data.device_type, data.id, data.command, "{}" + data.dept_id, "cmd", data.device_type, data.id, data.command, "{}" ) return BaseResponse(data=None) diff --git a/bbit_ai/app/routers/WS.py b/bbit_ai/app/routers/WS.py index c5eb472..d4b60bf 100644 --- a/bbit_ai/app/routers/WS.py +++ b/bbit_ai/app/routers/WS.py @@ -11,6 +11,7 @@ ws_manager = ConnectionManager() iot_ws_router = APIRouter() +# EMQX 通知在线与否 @iot_ws_router.websocket("/device-status") async def websocket_device_status( websocket: WebSocket, @@ -30,6 +31,7 @@ async def websocket_device_status( print("[WS] client disconnected") +# Vue 牧安云哨 @iot_ws_router.websocket("/sentinel_record") async def websocket_sentinel_record( websocket: WebSocket, diff --git a/bbit_ai/app/service/RabbitMQ.py b/bbit_ai/app/service/RabbitMQ.py index 69b0345..826f956 100644 --- a/bbit_ai/app/service/RabbitMQ.py +++ b/bbit_ai/app/service/RabbitMQ.py @@ -93,6 +93,5 @@ async def sentinel_pull_analysis_async(): async with message.process(): data = json.loads(message.body) req = SentinelRecordRequest(**data) - print(f"收到任务: {req}") await process_vehicle_animal_image(req) # 处理 print(f"完成任务: {req}")