303 lines
8.8 KiB
Python
303 lines
8.8 KiB
Python
import asyncio
|
|
import json
|
|
import pathlib
|
|
import uuid
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter
|
|
from fastapi import Depends
|
|
|
|
from config.minIO import get_upload_token
|
|
from config.redis import redis_client
|
|
from db.postgres.iot import *
|
|
from models.BaseResponse import BaseResponse
|
|
from models.IotDeviceCommandRequest import IotDeviceCommandRequest
|
|
|
|
iot_router = APIRouter()
|
|
from config.security import get_user_id_from_token
|
|
|
|
# -------------------- 设备接口 --------------------
|
|
|
|
|
|
# 已废弃 Webhooks的离线通知不及时(突然断电断网)
|
|
# @iot_router.post("/common/webhook")
|
|
# async def emqx_webhook(data: EMQXWebhook):
|
|
# device_id = data.clientid
|
|
# event = data.event
|
|
#
|
|
# if event == "client.connected":
|
|
# redis_client.set_online(device_id)
|
|
# # 这里刻意等1s 是因为设备连接后这里首先接到通知,但是状态信息设备来没来得及通过mqtt发送来,所以在此等待
|
|
# # 没有直接在mqtt发送来的消息中获取在线状态是因为 这里是通过emqx的webhooks通知的,两种通知方式不同,一方面防止其中一种逻辑失效,另一方面在mqtt消息接收中设置在线状态会存在滞后性,同时也需要设置遗嘱消息,较为
|
|
# await asyncio.sleep(1)
|
|
# await ws_manager.noticeOnlineStatus(
|
|
# {
|
|
# "deviceId": device_id,
|
|
# "online": True,
|
|
# "type": "status",
|
|
# }
|
|
# )
|
|
#
|
|
# print(f"[新设备在线] {device_id}")
|
|
#
|
|
# elif event == "client.disconnected":
|
|
# redis_client.set_offline(device_id)
|
|
# await ws_manager.noticeOnlineStatus(
|
|
# {
|
|
# "deviceId": device_id,
|
|
# "online": False,
|
|
# "type": "status",
|
|
# }
|
|
# )
|
|
#
|
|
# print(f"[设备离线] {device_id}")
|
|
#
|
|
# else:
|
|
# # 其他事件直接忽略
|
|
# print(f"[其他事件] {event}")
|
|
#
|
|
# return {"ok": True}
|
|
|
|
|
|
@iot_router.get("/common/device/list")
|
|
async def get_device_list(
|
|
page: int = 1,
|
|
pageSize: int = 10,
|
|
id: str | None = None,
|
|
name: str | None = None,
|
|
status: int | None = None,
|
|
is_superuser: int | None = None,
|
|
dept_id: str | None = None,
|
|
startTime: str | None = None,
|
|
endTime: str | None = None,
|
|
user_id: UUID = Depends(get_user_id_from_token),
|
|
):
|
|
if not user_id:
|
|
return {"error": "userId is required"}
|
|
|
|
devices, total = get_device_list_db_page(
|
|
page, pageSize, id, name, status, is_superuser, dept_id, startTime, endTime
|
|
)
|
|
|
|
# ===== 👇 核心:补在线状态 =====
|
|
for d in devices:
|
|
device_id = d["name"] # 账号
|
|
|
|
info_json = redis_client.get_device_info(device_id)
|
|
d["online"] = info_json.get("online", "0") == "1"
|
|
d["version"] = info_json.get("version", "")
|
|
d["ip"] = info_json.get("ip", "")
|
|
d["hostname"] = info_json.get("hostname", "")
|
|
d["mac"] = info_json.get("mac", "")
|
|
d["os"] = info_json.get("os", "")
|
|
d["cpu"] = info_json.get("cpu", "")
|
|
d["memory_total"] = info_json.get("memory_total", "")
|
|
d["disk_total"] = info_json.get("disk_total", "")
|
|
d["last_seen"] = info_json.get("last_seen", "")
|
|
|
|
return BaseResponse(data={"list": devices, "total": total})
|
|
|
|
|
|
@iot_router.post("/common/device")
|
|
async def create_device(data: dict, user_id: UUID = Depends(get_user_id_from_token)):
|
|
if not user_id:
|
|
return {"error": "userId is required"}
|
|
new_id = insert_device(data)
|
|
return BaseResponse(data={"id": new_id})
|
|
|
|
|
|
@iot_router.put("/common/device/{id}")
|
|
async def update_device(
|
|
id: str,
|
|
data: dict,
|
|
user_id: UUID = Depends(get_user_id_from_token),
|
|
):
|
|
if not user_id:
|
|
return {"error": "userId is required"}
|
|
|
|
count = update_device_db(id, data)
|
|
if count == 0:
|
|
return BaseResponse(status=False, message="设备不存在", data=None)
|
|
return BaseResponse(data=True)
|
|
|
|
|
|
@iot_router.patch("/common/device/{id}")
|
|
async def patch_device(
|
|
id: str,
|
|
data: dict,
|
|
user_id: UUID = Depends(get_user_id_from_token),
|
|
):
|
|
if not user_id:
|
|
return {"error": "userId is required"}
|
|
|
|
count = patch_device_db(id, data)
|
|
if count == 0:
|
|
return BaseResponse(status=False, message="设备不存在", data=None)
|
|
return BaseResponse(data=True)
|
|
|
|
|
|
@iot_router.delete("/common/device/{id}")
|
|
async def delete_device(
|
|
id: str,
|
|
user_id: UUID = Depends(get_user_id_from_token),
|
|
):
|
|
if not user_id:
|
|
return {"error": "userId is required"}
|
|
|
|
deleted = delete_device_db(id)
|
|
if deleted == 0:
|
|
return BaseResponse(status=False, message="设备不存在", data=None)
|
|
return BaseResponse(data=True)
|
|
|
|
|
|
@iot_router.get("/common/update/list")
|
|
async def get_update_list(
|
|
page: int = 1,
|
|
pageSize: int = 10,
|
|
id: str | None = None,
|
|
code: str | None = None,
|
|
dept_id: str | None = None,
|
|
startTime: str | None = None,
|
|
endTime: str | None = None,
|
|
user_id: UUID = Depends(get_user_id_from_token),
|
|
):
|
|
if not user_id:
|
|
return {"error": "userId is required"}
|
|
if code == "" or code is None:
|
|
code = None
|
|
else:
|
|
code = int(code)
|
|
|
|
updates, total = get_update_list_db_page(
|
|
page, pageSize, id, code, dept_id, startTime, endTime
|
|
)
|
|
|
|
return BaseResponse(data={"list": updates, "total": total})
|
|
|
|
|
|
@iot_router.post("/common/update")
|
|
async def create_update(data: dict, user_id: UUID = Depends(get_user_id_from_token)):
|
|
if not user_id:
|
|
return {"error": "userId is required"}
|
|
|
|
dept_id = data.get("dept_id")
|
|
if not dept_id:
|
|
return {"error": "dept_id is required"}
|
|
|
|
# 前端传来的版本号
|
|
try:
|
|
new_code = int(data.get("code", 0))
|
|
except (TypeError, ValueError):
|
|
return BaseResponse(
|
|
status=False,
|
|
message="无效的版本号",
|
|
data=None,
|
|
)
|
|
|
|
# 获取该组织当前最大版本号
|
|
max_code = getMaxCodeByDeptId(dept_id)
|
|
|
|
if new_code <= max_code:
|
|
return BaseResponse(
|
|
status=False,
|
|
message=f"新版本号必须大于当前最大版本号 {max_code}",
|
|
data=None,
|
|
)
|
|
|
|
# 插入数据库
|
|
new_id = insert_update(data)
|
|
return BaseResponse(data={"id": new_id})
|
|
|
|
|
|
@iot_router.delete("/common/update/{id}")
|
|
async def delete_update(
|
|
id: str,
|
|
user_id: UUID = Depends(get_user_id_from_token),
|
|
):
|
|
if not user_id:
|
|
return {"error": "userId is required"}
|
|
|
|
deleted = delete_update_db(id)
|
|
if deleted == 0:
|
|
return BaseResponse(status=False, message="更新记录不存在", data=None)
|
|
|
|
return BaseResponse(data=True)
|
|
|
|
|
|
@iot_router.get("/common/update/getUploadUrl")
|
|
def getUploadUrl(
|
|
filename: str | None = None,
|
|
user_id: UUID = Depends(get_user_id_from_token),
|
|
):
|
|
if not user_id:
|
|
return {"error": "userId is required"}
|
|
# 生成唯一文件名,避免覆盖
|
|
ext = pathlib.Path(filename).suffix if filename else "" # 获取文件后缀
|
|
object_name = f"{uuid.uuid4()}{ext}" # 拼接到 UUID 后面
|
|
return BaseResponse(
|
|
data={
|
|
"uploadUrl": get_upload_token("iot-update", object_name),
|
|
"id": object_name,
|
|
}
|
|
)
|
|
|
|
|
|
@iot_router.get("/common/update/getMaxCodeByDeptId")
|
|
def updateGetMaxCodeByDeptId(
|
|
user_id: UUID = Depends(get_user_id_from_token),
|
|
dept_id: str | None = None,
|
|
):
|
|
if not user_id:
|
|
return {"error": "userId is required"}
|
|
# 生成唯一文件名,避免覆盖
|
|
return BaseResponse(data=getMaxCodeByDeptId(dept_id))
|
|
|
|
|
|
@iot_router.get("/common/update/check")
|
|
def getUploadUrl(
|
|
deviceID: str | None = None,
|
|
):
|
|
return BaseResponse(data=get_update_package(deviceID))
|
|
|
|
|
|
# request_id -> asyncio.Future
|
|
pending_commands: dict[str, asyncio.Future] = {}
|
|
|
|
|
|
@iot_router.post("/common/device/command")
|
|
async def command(
|
|
data: IotDeviceCommandRequest, user_id: UUID = Depends(get_user_id_from_token)
|
|
):
|
|
if not user_id:
|
|
return {"error": "userId is required"}
|
|
|
|
request_id = str(uuid.uuid4())
|
|
payload = {"request_id": request_id}
|
|
|
|
loop = asyncio.get_running_loop()
|
|
future = loop.create_future()
|
|
pending_commands[request_id] = future
|
|
|
|
from config.emqx import mqtt_publish
|
|
|
|
await mqtt_publish(
|
|
data.dept_id,
|
|
"cmd",
|
|
data.device_type,
|
|
data.id,
|
|
data.command,
|
|
json.dumps(payload),
|
|
)
|
|
|
|
try:
|
|
result = await asyncio.wait_for(future, timeout=5)
|
|
return BaseResponse(data=result.get("massage"))
|
|
except asyncio.TimeoutError:
|
|
return BaseResponse(data=None, message="Device did not respond in time")
|
|
except asyncio.CancelledError:
|
|
# 请求被中断,必须清理,但不要吞
|
|
pending_commands.pop(request_id, None)
|
|
raise
|
|
finally:
|
|
pending_commands.pop(request_id, None)
|