完善中间件自动更新逻辑

This commit is contained in:
BBIT-Kai
2025-12-30 17:55:43 +08:00
parent 62e8ecb7d6
commit 3c1128b356
8 changed files with 74 additions and 26 deletions
+20 -5
View File
@@ -8,6 +8,7 @@ import uuid
from aiomqtt import Client from aiomqtt import Client
from config.redis import redis_client from config.redis import redis_client
from db.postgres import get_dept_id_by_iot_user_name, get_device_type_by_iot_user_name
from models.MqttTopic import MqttTopic from models.MqttTopic import MqttTopic
# ================= 配置区域 ================= # ================= 配置区域 =================
@@ -19,9 +20,9 @@ TLS_CONTEXT = ssl.create_default_context()
# 默认连接后要订阅的 topic 配置 # 默认连接后要订阅的 topic 配置
DEFAULT_SUBSCRIPTIONS = [ DEFAULT_SUBSCRIPTIONS = [
MqttTopic.from_parts( MqttTopic.from_parts(
project=None, dept_id=None,
domain="status", domain="status",
device_type="edge", device_type=None,
device_id=None, device_id=None,
resource="info", resource="info",
) )
@@ -57,7 +58,7 @@ def get_device_id_simple():
async def mqtt_publish( async def mqtt_publish(
project: str, dept_id: str,
domain: str, domain: str,
device_type: str, device_type: str,
device_id: str, device_id: str,
@@ -68,7 +69,7 @@ async def mqtt_publish(
"""发布消息(使用全局客户端)""" """发布消息(使用全局客户端)"""
if not MQTT_CLIENT: if not MQTT_CLIENT:
raise RuntimeError("MQTT client is not initialized") raise RuntimeError("MQTT client is not initialized")
topic = f"{project}/{domain}/{device_type}/{device_id}/{resource}" topic = f"{dept_id}/{domain}/{device_type}/{device_id}/{resource}"
await MQTT_CLIENT.publish(topic, payload, qos=qos) await MQTT_CLIENT.publish(topic, payload, qos=qos)
print(f"Published to {topic}: {payload}") print(f"Published to {topic}: {payload}")
@@ -97,9 +98,23 @@ async def _mqtt_handle_messages():
print("收到消息:" + str(topic)) print("收到消息:" + str(topic))
# 处理基础状态信息 # 处理基础状态信息
if topic.domain == "status" and topic.resource == "info": if topic.is_status():
# 这里收到的数据是这样的:"x/status/x/deviceID/info"
payload = json.loads(message.payload.decode()) payload = json.loads(message.payload.decode())
redis_client.set_device_info(topic.device_id, payload) redis_client.set_device_info(topic.device_id, payload)
dept_id = get_dept_id_by_iot_user_name(topic.device_id)
dept_edge = get_device_type_by_iot_user_name(topic.device_id)
payload["dept_id"] = dept_id
payload_str = json.dumps(payload, ensure_ascii=False)
# 返回给设备
await mqtt_publish(
dept_id,
topic.domain,
dept_edge,
topic.device_id,
"receipt",
payload_str,
)
async def mqtt_client_async(): async def mqtt_client_async():
+3
View File
@@ -78,6 +78,7 @@ def get_device_list_db_page(
d.is_active, d.is_active,
d.is_superuser, d.is_superuser,
d.dept_id, d.dept_id,
d.type,
sd.name AS dept_name, sd.name AS dept_name,
d.created_at d.created_at
FROM iot_users d FROM iot_users d
@@ -99,6 +100,7 @@ def get_device_list_db_page(
is_active, is_active,
is_superuser, is_superuser,
dept_id, dept_id,
type,
dept_name, dept_name,
created_at, created_at,
) = r ) = r
@@ -111,6 +113,7 @@ def get_device_list_db_page(
"status": 1 if is_active else 0, "status": 1 if is_active else 0,
"is_superuser": 1 if is_superuser else 0, "is_superuser": 1 if is_superuser else 0,
"dept_id": dept_id, "dept_id": dept_id,
"device_type": type,
"dept_name": dept_name, "dept_name": dept_name,
"created_at": format_datetime(created_at), "created_at": format_datetime(created_at),
} }
+12 -2
View File
@@ -952,16 +952,26 @@ def get_dept_id_by_user_id(user_id: str) -> str:
return str(dept_id) return str(dept_id)
def get_dept_id_by_iot_user_name(user_id: UUID) -> str: def get_dept_id_by_iot_user_name(user_id: str) -> str:
# 通过 iot_user_id 查找其所属的 dept_id # 通过 iot_user_id 查找其所属的 dept_id
with pg_pool.getConn() as conn: with pg_pool.getConn() as conn:
with conn.cursor() as cursor: with conn.cursor() as cursor:
cursor.execute("SELECT dept_id FROM iot_users WHERE name = %s", (user_id,)) cursor.execute("SELECT dept_id FROM iot_users WHERE name = %s", (user_id,))
dept_id = cursor.fetchone() dept_id = cursor.fetchone()
dept_id = dept_id[0] dept_id = str(dept_id[0])
return dept_id return dept_id
def get_device_type_by_iot_user_name(user_id: str) -> str:
# 通过 iot_user_id 查找其所属的 type
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT type FROM iot_users WHERE name = %s", (user_id,))
type = cursor.fetchone()
type = str(type[0])
return type
from typing import List from typing import List
@@ -4,5 +4,5 @@ from pydantic import BaseModel
class IotDeviceCommandRequest(BaseModel): class IotDeviceCommandRequest(BaseModel):
id: str | None = None id: str | None = None
command: str | None = None command: str | None = None
project: str | None = None dept_id: str | None = None
device_type: str | None = None device_type: str | None = None
+6 -6
View File
@@ -16,7 +16,7 @@ class MqttTopic:
# 不足的层级用 None 补齐,避免属性缺失 # 不足的层级用 None 补齐,避免属性缺失
parts += [None] * (self.LEVELS - len(parts)) parts += [None] * (self.LEVELS - len(parts))
self.project: Optional[str] = parts[0] self.dept_id: Optional[str] = parts[0]
self.domain: Optional[str] = parts[1] self.domain: Optional[str] = parts[1]
self.device_type: Optional[str] = parts[2] self.device_type: Optional[str] = parts[2]
self.device_id: Optional[str] = parts[3] self.device_id: Optional[str] = parts[3]
@@ -25,7 +25,7 @@ class MqttTopic:
@classmethod @classmethod
def from_parts( def from_parts(
cls, cls,
project: Optional[str] = None, dept_id: Optional[str] = None,
domain: Optional[str] = None, domain: Optional[str] = None,
device_type: Optional[str] = None, device_type: Optional[str] = None,
device_id: Optional[str] = None, device_id: Optional[str] = None,
@@ -43,7 +43,7 @@ class MqttTopic:
map( map(
_v, _v,
[ [
project, dept_id,
domain, domain,
device_type, device_type,
device_id, device_id,
@@ -65,7 +65,7 @@ class MqttTopic:
map( map(
_v, _v,
[ [
self.project, self.dept_id,
self.domain, self.domain,
self.device_type, self.device_type,
self.device_id, self.device_id,
@@ -80,7 +80,7 @@ class MqttTopic:
用于 publish 场景 用于 publish 场景
""" """
parts = [ parts = [
self.project, self.dept_id,
self.domain, self.domain,
self.device_type, self.device_type,
self.device_id, self.device_id,
@@ -101,7 +101,7 @@ class MqttTopic:
return f"<MqttTopic {self.to_topic()}>" return f"<MqttTopic {self.to_topic()}>"
def is_status(self) -> bool: def is_status(self) -> bool:
return self.domain == "status" return self.domain == "status" and self.resource == "info"
def is_cmd(self) -> bool: def is_cmd(self) -> bool:
return self.domain == "cmd" return self.domain == "cmd"
+30 -11
View File
@@ -1,3 +1,5 @@
import asyncio
import pathlib
import uuid import uuid
from uuid import UUID from uuid import UUID
@@ -26,21 +28,34 @@ async def emqx_webhook(data: EMQXWebhook):
if event == "client.connected": if event == "client.connected":
redis_client.set_online(device_id) redis_client.set_online(device_id)
# 这里刻意等1s 是因为设备连接后这里首先接到通知,但是状态信息设备来没来得及通过mqtt发送来,所以在此等待
# 没有直接在mqtt发送来的消息中获取在线状态是因为 这里是通过emqx的webhooks通知的,两种通知方式不同,一方面防止其中一种逻辑失效,另一方面在mqtt消息接收中设置在线状态会存在滞后性,同时也需要设置遗嘱消息,较为
await asyncio.sleep(1)
await ws_manager.noticeOnlineStatus(
{
"deviceId": device_id,
"online": True,
"type": "status",
}
)
await ws_manager.noticeOnlineStatus({"deviceId": device_id, "online": True}) print(f"[新设备在线] {device_id}")
print(f"[ONLINE] {device_id}")
elif event == "client.disconnected": elif event == "client.disconnected":
redis_client.set_offline(device_id) redis_client.set_offline(device_id)
await ws_manager.noticeOnlineStatus(
{
"deviceId": device_id,
"online": False,
"type": "status",
}
)
await ws_manager.noticeOnlineStatus({"deviceId": device_id, "online": False}) print(f"[设备离线] {device_id}")
print(f"[OFFLINE] {device_id}")
else: else:
# 其他事件直接忽略 # 其他事件直接忽略
print(f"[IGNORE] {event}") print(f"[其他事件] {event}")
return {"ok": True} return {"ok": True}
@@ -80,8 +95,6 @@ async def get_device_list(
d["memory_total"] = info_json.get("memory_total", "") d["memory_total"] = info_json.get("memory_total", "")
d["disk_total"] = info_json.get("disk_total", "") d["disk_total"] = info_json.get("disk_total", "")
d["last_seen"] = info_json.get("last_seen", "") d["last_seen"] = info_json.get("last_seen", "")
d["project"] = info_json.get("project", "")
d["device_type"] = info_json.get("deviceType", "")
return BaseResponse(data={"list": devices, "total": total}) return BaseResponse(data={"list": devices, "total": total})
@@ -214,10 +227,14 @@ async def delete_update(
@iot_router.get("/common/update/getUploadUrl") @iot_router.get("/common/update/getUploadUrl")
def getUploadUrl( def getUploadUrl(
filename: str | None = None,
user_id: UUID = Depends(get_user_id_from_token), user_id: UUID = Depends(get_user_id_from_token),
): ):
if not user_id:
return {"error": "userId is required"}
# 生成唯一文件名,避免覆盖 # 生成唯一文件名,避免覆盖
object_name = f"{uuid.uuid4()}" ext = pathlib.Path(filename).suffix if filename else "" # 获取文件后缀
object_name = f"{uuid.uuid4()}{ext}" # 拼接到 UUID 后面
return BaseResponse( return BaseResponse(
data={ data={
"uploadUrl": get_upload_token("iot-update", object_name), "uploadUrl": get_upload_token("iot-update", object_name),
@@ -231,6 +248,8 @@ def updateGetMaxCodeByDeptId(
user_id: UUID = Depends(get_user_id_from_token), user_id: UUID = Depends(get_user_id_from_token),
dept_id: str | None = None, dept_id: str | None = None,
): ):
if not user_id:
return {"error": "userId is required"}
# 生成唯一文件名,避免覆盖 # 生成唯一文件名,避免覆盖
return BaseResponse(data=getMaxCodeByDeptId(dept_id)) return BaseResponse(data=getMaxCodeByDeptId(dept_id))
@@ -251,6 +270,6 @@ async def command(
return {"error": "userId is required"} return {"error": "userId is required"}
await mqtt_publish( await mqtt_publish(
data.project, "cmd", data.device_type, data.id, data.command, "{}" data.dept_id, "cmd", data.device_type, data.id, data.command, "{}"
) )
return BaseResponse(data=None) return BaseResponse(data=None)
+2
View File
@@ -11,6 +11,7 @@ ws_manager = ConnectionManager()
iot_ws_router = APIRouter() iot_ws_router = APIRouter()
# EMQX 通知在线与否
@iot_ws_router.websocket("/device-status") @iot_ws_router.websocket("/device-status")
async def websocket_device_status( async def websocket_device_status(
websocket: WebSocket, websocket: WebSocket,
@@ -30,6 +31,7 @@ async def websocket_device_status(
print("[WS] client disconnected") print("[WS] client disconnected")
# Vue 牧安云哨
@iot_ws_router.websocket("/sentinel_record") @iot_ws_router.websocket("/sentinel_record")
async def websocket_sentinel_record( async def websocket_sentinel_record(
websocket: WebSocket, websocket: WebSocket,
-1
View File
@@ -93,6 +93,5 @@ async def sentinel_pull_analysis_async():
async with message.process(): async with message.process():
data = json.loads(message.body) data = json.loads(message.body)
req = SentinelRecordRequest(**data) req = SentinelRecordRequest(**data)
print(f"收到任务: {req}")
await process_vehicle_animal_image(req) # 处理 await process_vehicle_animal_image(req) # 处理
print(f"完成任务: {req}") print(f"完成任务: {req}")