AI实验室后端

This commit is contained in:
BBIT-Kai
2026-02-04 13:58:18 +08:00
parent f9536dd0b4
commit 646e312a4c
24 changed files with 962 additions and 86 deletions
+106 -4
View File
@@ -1,15 +1,20 @@
import asyncio
import json
import os
import socket
import ssl
import sys
import uuid
from datetime import timezone
from aiomqtt import Client
from aiomqtt import Client, Will
import utils
from config.redis import redis_client
from db.postgres import get_dept_id_by_iot_user_name, get_device_type_by_iot_user_name
from models.MqttTopic import MqttTopic
from routers.Iot import pending_commands
from routers.WS import ws_manager
# ================= 配置区域 =================
MQTT_BROKER = "ai.ronsunny.cn"
@@ -19,13 +24,22 @@ TLS_CONTEXT = ssl.create_default_context()
# 默认连接后要订阅的 topic 配置
DEFAULT_SUBSCRIPTIONS = [
# 状态信息回执
MqttTopic.from_parts(
dept_id=None,
domain="status",
device_type=None,
device_id=None,
resource="info",
)
),
# 其他信息回执(指令)
MqttTopic.from_parts(
dept_id=None,
domain="receipt",
device_type=None,
device_id=None,
resource="info",
),
]
# ===========================================
@@ -50,7 +64,6 @@ def get_device_id_simple():
return hostname
# todo 这里需要订阅状态信息 设备发送信息 这里回复 vue前端发送指令 后端发送指令 设备接收指令
# ------------------ MQTT 封装 ------------------
@@ -93,11 +106,11 @@ async def _mqtt_handle_messages():
async for message in MQTT_CLIENT.messages:
topic = MqttTopic(message.topic)
print("收到消息:" + str(topic))
payload = json.loads(message.payload.decode())
# 处理基础状态信息
if topic.is_status():
# 这里收到的数据是这样的:"x/status/x/deviceID/info"
payload = json.loads(message.payload.decode())
redis_client.set_device_info(topic.device_id, payload)
dept_id = get_dept_id_by_iot_user_name(topic.device_id)
dept_edge = get_device_type_by_iot_user_name(topic.device_id)
@@ -112,12 +125,35 @@ async def _mqtt_handle_messages():
"receipt",
payload_str,
)
print("设备" + topic.device_id + "变化:" + str(payload["online"]))
# 通知vue更新在线状态
await ws_manager.noticeOnlineStatus(
{
"deviceId": topic.device_id,
"online": payload["online"],
"type": "status",
}
)
# 如果是设备回复
elif topic.is_response() and "request_id" in payload:
req_id = payload["request_id"]
future = pending_commands.get(req_id)
if future and not future.done():
future.set_result(payload) # 唤醒等待的 HTTP 接口
async def mqtt_client_async():
global DEVICE_ID, MQTT_CLIENT
DEVICE_ID = get_device_id_simple()
print("服务端EMQX账号:", DEVICE_ID)
lwt_topic, lwt_payload = get_status_info_topic_payload(False)
will = Will(
topic=lwt_topic,
payload=lwt_payload,
qos=1, # 通常 LWT 用 QoS 1
retain=False,
)
async with Client(
MQTT_BROKER,
port=MQTT_PORT,
@@ -125,8 +161,11 @@ async def mqtt_client_async():
password=MQTT_PASSWORD,
tls_context=TLS_CONTEXT,
identifier=DEVICE_ID,
will=will,
) as client:
MQTT_CLIENT = client # 保存全局客户端
client.on_connect = on_connect
client.on_disconnect = on_disconnect
print("MQTT client connected")
# 订阅默认 topic
@@ -134,10 +173,73 @@ async def mqtt_client_async():
await MQTT_CLIENT.subscribe(topic.to_topic())
print(f"Subscribed to default topic: {topic.to_topic()}")
# 发送基础消息:"x/status/x/deviceID/info"
await public_device_status()
# 启动消息处理循环
await _mqtt_handle_messages()
def on_connect(client, flags, rc, properties):
print("MQTT connected")
def on_disconnect(client, packet, exc=None):
print("MQTT disconnected:", exc)
INITIAL_RECONNECT_INTERVAL = 5
MAX_RECONNECT_INTERVAL = 60
async def mqtt_client_runner():
global MQTT_CLIENT
reconnect_interval = INITIAL_RECONNECT_INTERVAL
while True:
try:
await mqtt_client_async()
# 如果 mqtt_client_async 正常返回,说明是主动退出
reconnect_interval = INITIAL_RECONNECT_INTERVAL
except Exception as e:
print("MQTT 连接异常:", e)
print(f"等待 {reconnect_interval}s 后重连...")
await asyncio.sleep(reconnect_interval)
# ⭐ 指数退避放在这里
reconnect_interval = min(reconnect_interval * 2, MAX_RECONNECT_INTERVAL)
def get_status_info_topic_payload(is_online: bool):
info = {
"version": "1.0.0", # 替换成你的 APP 版本
"online": is_online,
"ip": utils.get_local_ip(),
"hostname": socket.gethostname(),
"mac": utils.get_mac_address(),
"os": utils.platform.platform(),
"cpu": utils.get_cpu_info(),
"memory_total": utils.get_memory_total(),
"disk_total": utils.get_disk_total(),
"last_seen": utils.datetime.now(timezone.utc).isoformat(),
}
topic = "x/status/x/" + get_device_id_simple() + "/info"
payload = json.dumps(info)
return topic, payload
async def public_device_status():
_, payload = get_status_info_topic_payload(True)
await mqtt_publish(
"x",
"status",
"server",
get_device_id_simple(),
"info",
payload,
)
# ------------------ 示例主程序 ------------------
+57
View File
@@ -0,0 +1,57 @@
import httpx
from fastapi import HTTPException
class HttpClient:
def __init__(self, timeout: int = 10):
self.timeout = timeout
self.client = httpx.AsyncClient(timeout=timeout)
async def request(
self,
method: str,
url: str,
params: dict = None,
data: dict = None,
json: dict = None,
headers: dict = None,
):
"""
通用请求方法
method: GET / POST / PUT / DELETE 等
url: 请求地址
params: URL 参数
data: form 数据
json: JSON 数据
headers: 请求头
"""
try:
response = await self.client.request(
method=method.upper(),
url=url,
params=params,
data=data,
json=json,
headers=headers,
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
raise HTTPException(status_code=500, detail=f"请求失败: {e}")
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=e.response.status_code, detail=f"外部接口返回错误: {e}"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"解析失败: {e}")
async def get(self, url: str, params: dict = None, headers: dict = None):
return await self.request("GET", url, params=params, headers=headers)
async def post(
self, url: str, data: dict = None, json: dict = None, headers: dict = None
):
return await self.request("POST", url, data=data, json=json, headers=headers)
async def close(self):
await self.client.aclose()
+37 -13
View File
@@ -1,3 +1,5 @@
import json
import redis
@@ -5,7 +7,7 @@ import redis
class RedisClient:
def __init__(self, config_path="config.yaml"):
def __init__(self):
self.redis = redis.Redis(
"10.10.12.101",
6379,
@@ -13,17 +15,14 @@ class RedisClient:
decode_responses=True,
)
def set_online(self, device_id: str):
key = f"device:online:{device_id}"
self.redis.set(key, 1)
def set_offline(self, device_id: str):
key = f"device:online:{device_id}"
self.redis.delete(key)
def is_device_online(self, device_id: str) -> bool:
key = f"device:online:{device_id}"
return self.redis.exists(key) == 1
# 已废弃 之前使用webhookds的方式通知 但是因为通知不及时的原因,现在在线状态全盘交给device status
# def set_online(self, device_id: str):
# key = f"device:online:{device_id}"
# self.redis.set(key, 1)
#
# def set_offline(self, device_id: str):
# key = f"device:online:{device_id}"
# self.redis.delete(key)
def set_device_info(self, device_id: str, info: dict):
"""
@@ -41,7 +40,32 @@ class RedisClient:
def get_device_info(self, device_id: str) -> dict:
key = f"device:info:{device_id}"
return self.redis.hgetall(key)
raw_info = self.redis.hgetall(key)
return raw_info
def get_value(self, key: str):
"""
获取 Redis 中的值,如果是 JSON 字符串自动解析为 dict
"""
value = self.redis.get(key)
if not value:
return None
try:
return json.loads(value)
except json.JSONDecodeError:
return value
def set_value(self, key: str, value, expire: int = None):
"""
存储 key-valuevalue 可以是 dict,自动序列化为 JSON
expire 单位秒
"""
if isinstance(value, (dict, list)):
value = json.dumps(value)
if expire:
self.redis.set(key, value, ex=expire)
else:
self.redis.set(key, value)
redis_client = RedisClient()