import asyncio import json import pathlib import uuid from uuid import UUID from fastapi import APIRouter from fastapi import Depends from config.minIO import get_upload_token from config.redis import redis_client from db.postgres.iot import * from models.BaseResponse import BaseResponse from models.IotDeviceCommandRequest import IotDeviceCommandRequest iot_router = APIRouter() from config.security import get_user_id_from_token # -------------------- 设备接口 -------------------- # 已废弃 Webhooks的离线通知不及时(突然断电断网) # @iot_router.post("/common/webhook") # async def emqx_webhook(data: EMQXWebhook): # device_id = data.clientid # event = data.event # # if event == "client.connected": # redis_client.set_online(device_id) # # 这里刻意等1s 是因为设备连接后这里首先接到通知,但是状态信息设备来没来得及通过mqtt发送来,所以在此等待 # # 没有直接在mqtt发送来的消息中获取在线状态是因为 这里是通过emqx的webhooks通知的,两种通知方式不同,一方面防止其中一种逻辑失效,另一方面在mqtt消息接收中设置在线状态会存在滞后性,同时也需要设置遗嘱消息,较为 # await asyncio.sleep(1) # await ws_manager.noticeOnlineStatus( # { # "deviceId": device_id, # "online": True, # "type": "status", # } # ) # # print(f"[新设备在线] {device_id}") # # elif event == "client.disconnected": # redis_client.set_offline(device_id) # await ws_manager.noticeOnlineStatus( # { # "deviceId": device_id, # "online": False, # "type": "status", # } # ) # # print(f"[设备离线] {device_id}") # # else: # # 其他事件直接忽略 # print(f"[其他事件] {event}") # # return {"ok": True} @iot_router.get("/common/device/list") async def get_device_list( page: int = 1, pageSize: int = 10, id: str | None = None, name: str | None = None, status: int | None = None, is_superuser: int | None = None, dept_id: str | None = None, startTime: str | None = None, endTime: str | None = None, user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} devices, total = get_device_list_db_page( page, pageSize, id, name, status, is_superuser, dept_id, startTime, endTime ) # ===== 👇 核心:补在线状态 ===== for d in devices: device_id = d["name"] # 账号 info_json = redis_client.get_device_info(device_id) d["online"] = info_json.get("online", "0") == "1" d["version"] = info_json.get("version", "") d["ip"] = info_json.get("ip", "") d["hostname"] = info_json.get("hostname", "") d["mac"] = info_json.get("mac", "") d["os"] = info_json.get("os", "") d["cpu"] = info_json.get("cpu", "") d["memory_total"] = info_json.get("memory_total", "") d["disk_total"] = info_json.get("disk_total", "") d["last_seen"] = info_json.get("last_seen", "") return BaseResponse(data={"list": devices, "total": total}) @iot_router.post("/common/device") async def create_device(data: dict, user_id: UUID = Depends(get_user_id_from_token)): if not user_id: return {"error": "userId is required"} new_id = insert_device(data) return BaseResponse(data={"id": new_id}) @iot_router.put("/common/device/{id}") async def update_device( id: str, data: dict, user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} count = update_device_db(id, data) if count == 0: return BaseResponse(status=False, message="设备不存在", data=None) return BaseResponse(data=True) @iot_router.patch("/common/device/{id}") async def patch_device( id: str, data: dict, user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} count = patch_device_db(id, data) if count == 0: return BaseResponse(status=False, message="设备不存在", data=None) return BaseResponse(data=True) @iot_router.delete("/common/device/{id}") async def delete_device( id: str, user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} deleted = delete_device_db(id) if deleted == 0: return BaseResponse(status=False, message="设备不存在", data=None) return BaseResponse(data=True) @iot_router.get("/common/update/list") async def get_update_list( page: int = 1, pageSize: int = 10, id: str | None = None, code: str | None = None, dept_id: str | None = None, startTime: str | None = None, endTime: str | None = None, user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} if code == "" or code is None: code = None else: code = int(code) updates, total = get_update_list_db_page( page, pageSize, id, code, dept_id, startTime, endTime ) return BaseResponse(data={"list": updates, "total": total}) @iot_router.post("/common/update") async def create_update(data: dict, user_id: UUID = Depends(get_user_id_from_token)): if not user_id: return {"error": "userId is required"} dept_id = data.get("dept_id") if not dept_id: return {"error": "dept_id is required"} # 前端传来的版本号 try: new_code = int(data.get("code", 0)) except (TypeError, ValueError): return BaseResponse( status=False, message="无效的版本号", data=None, ) # 获取该组织当前最大版本号 max_code = getMaxCodeByDeptId(dept_id) if new_code <= max_code: return BaseResponse( status=False, message=f"新版本号必须大于当前最大版本号 {max_code}", data=None, ) # 插入数据库 new_id = insert_update(data) return BaseResponse(data={"id": new_id}) @iot_router.delete("/common/update/{id}") async def delete_update( id: str, user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} deleted = delete_update_db(id) if deleted == 0: return BaseResponse(status=False, message="更新记录不存在", data=None) return BaseResponse(data=True) @iot_router.get("/common/update/getUploadUrl") def getUploadUrl( filename: str | None = None, user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} # 生成唯一文件名,避免覆盖 ext = pathlib.Path(filename).suffix if filename else "" # 获取文件后缀 object_name = f"{uuid.uuid4()}{ext}" # 拼接到 UUID 后面 return BaseResponse( data={ "uploadUrl": get_upload_token("iot-update", object_name), "id": object_name, } ) @iot_router.get("/common/update/getMaxCodeByDeptId") def updateGetMaxCodeByDeptId( user_id: UUID = Depends(get_user_id_from_token), dept_id: str | None = None, ): if not user_id: return {"error": "userId is required"} # 生成唯一文件名,避免覆盖 return BaseResponse(data=getMaxCodeByDeptId(dept_id)) @iot_router.get("/common/update/check") def getUploadUrl( deviceID: str | None = None, ): return BaseResponse(data=get_update_package(deviceID)) # request_id -> asyncio.Future pending_commands: dict[str, asyncio.Future] = {} @iot_router.post("/common/device/command") async def command( data: IotDeviceCommandRequest, user_id: UUID = Depends(get_user_id_from_token) ): if not user_id: return {"error": "userId is required"} request_id = str(uuid.uuid4()) payload = {"request_id": request_id} loop = asyncio.get_running_loop() future = loop.create_future() pending_commands[request_id] = future from config.emqx import mqtt_publish await mqtt_publish( data.dept_id, "cmd", data.device_type, data.id, data.command, json.dumps(payload), ) try: result = await asyncio.wait_for(future, timeout=5) return BaseResponse(data=result.get("massage")) except asyncio.TimeoutError: return BaseResponse(data=None, message="Device did not respond in time") except asyncio.CancelledError: # 请求被中断,必须清理,但不要吞 pending_commands.pop(request_id, None) raise finally: pending_commands.pop(request_id, None)