From 646e312a4c7cde682040a7a960873d4a6b5f3e35 Mon Sep 17 00:00:00 2001 From: BBIT-Kai <2911862937@qq.com> Date: Wed, 4 Feb 2026 13:58:18 +0800 Subject: [PATCH] =?UTF-8?q?AI=E5=AE=9E=E9=AA=8C=E5=AE=A4=E5=90=8E=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bbit_ai/app/app.py | 19 +- bbit_ai/app/config/emqx.py | 110 ++++++++- bbit_ai/app/config/httpClient.py | 57 +++++ bbit_ai/app/config/redis.py | 50 +++- bbit_ai/app/db/postgres/__init__.py | 1 + bbit_ai/app/db/postgres/annual_meeting.py | 224 ++++++++++++++++++ bbit_ai/app/db/postgres/iot.py | 3 +- bbit_ai/app/db/postgres/sentinel.py | 71 ++++++ bbit_ai/app/db/postgres/system.py | 26 +- bbit_ai/app/db/postgres/ws_manager.py | 31 ++- bbit_ai/app/models/LotteryCreateReq.py | 11 + bbit_ai/app/models/LotteryUpdateReq.py | 12 + bbit_ai/app/models/MqttTopic.py | 3 + bbit_ai/app/routers/AnnualMeeting.py | 122 ++++++++++ bbit_ai/app/routers/Iot.py | 114 +++++---- bbit_ai/app/routers/Sentinel.py | 94 ++++++++ bbit_ai/app/routers/System.py | 2 +- bbit_ai/app/routers/Vision.py | 4 +- bbit_ai/app/routers/WS.py | 20 ++ bbit_ai/app/service/vision.py | 28 ++- bbit_ai/app/utils/MyUtils.py | 35 +++ bbit_ai/app/utils/__init__.py | 1 + .../server/model/database/ScaImagesTable.kt | 2 +- vue2/pnpm-lock.yaml | 8 + 24 files changed, 962 insertions(+), 86 deletions(-) create mode 100644 bbit_ai/app/config/httpClient.py create mode 100644 bbit_ai/app/db/postgres/annual_meeting.py create mode 100644 bbit_ai/app/models/LotteryCreateReq.py create mode 100644 bbit_ai/app/models/LotteryUpdateReq.py create mode 100644 bbit_ai/app/routers/AnnualMeeting.py diff --git a/bbit_ai/app/app.py b/bbit_ai/app/app.py index 538ad5c..a63b018 100644 --- a/bbit_ai/app/app.py +++ b/bbit_ai/app/app.py @@ -1,11 +1,13 @@ import asyncio -from fastapi import FastAPI +from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from uvicorn import Config, Server -from config.emqx import mqtt_client_async +from config.emqx import mqtt_client_runner from config.yolo import YOLOSingleton +from models.BaseResponse import BaseResponse +from routers.AnnualMeeting import amRouter from routers.Bot import botRouter from routers.Chat import chatRouter from routers.Datasource import reportDataRouter @@ -51,16 +53,25 @@ async def ai_lab(): app.include_router(r, prefix="/llm", tags=["llm"]) app.include_router(visionRouter, prefix="/cv", tags=["cv"]) app.include_router(systemRouter, prefix="/system", tags=["system"]) + app.include_router(amRouter, prefix="/am", tags=["annual_meeting"]) app.include_router(iot_router, prefix="/iot", tags=["iot"]) app.include_router(sentinel_router, prefix="/iot/sentinel", tags=["iot_sentinel"]) app.include_router(iot_ws_router, prefix="/iot/ws", tags=["iot_ws"]) app.include_router(publicRouter, prefix="/api/public", tags=["api"]) - config = Config(app=app, host="0.0.0.0", port=13011, log_level="debug") + + # ----------- 全局异常捕获 --------- + @app.exception_handler(Exception) + async def global_exception_handler(request: Request, exc: Exception): + return BaseResponse(status=False, message=str(exc), data=None) + + config = Config(app=app, host="0.0.0.0", port=13011, log_level="warning") server = Server(config) + await server.serve() async def main(): + # 初始化模型 YOLOSingleton.init_model() # 主干AI实验室FastAPI服务 @@ -68,7 +79,7 @@ async def main(): # RabbitMQ服务 task_mq = asyncio.create_task(sentinel_pull_analysis_async()) # 等 HTTP 服务启动后再启动 MQTT - task_mqtt = asyncio.create_task(mqtt_client_async()) + task_mqtt = asyncio.create_task(mqtt_client_runner()) await asyncio.gather(task_api, task_mq, task_mqtt) # MCP服务-ailab diff --git a/bbit_ai/app/config/emqx.py b/bbit_ai/app/config/emqx.py index 83dda00..f38d407 100644 --- a/bbit_ai/app/config/emqx.py +++ b/bbit_ai/app/config/emqx.py @@ -1,15 +1,20 @@ +import asyncio import json import os import socket import ssl import sys import uuid +from datetime import timezone -from aiomqtt import Client +from aiomqtt import Client, Will +import utils from config.redis import redis_client from db.postgres import get_dept_id_by_iot_user_name, get_device_type_by_iot_user_name from models.MqttTopic import MqttTopic +from routers.Iot import pending_commands +from routers.WS import ws_manager # ================= 配置区域 ================= MQTT_BROKER = "ai.ronsunny.cn" @@ -19,13 +24,22 @@ TLS_CONTEXT = ssl.create_default_context() # 默认连接后要订阅的 topic 配置 DEFAULT_SUBSCRIPTIONS = [ + # 状态信息回执 MqttTopic.from_parts( dept_id=None, domain="status", device_type=None, device_id=None, resource="info", - ) + ), + # 其他信息回执(指令) + MqttTopic.from_parts( + dept_id=None, + domain="receipt", + device_type=None, + device_id=None, + resource="info", + ), ] # =========================================== @@ -50,7 +64,6 @@ def get_device_id_simple(): return hostname -# todo 这里需要订阅状态信息 设备发送信息 这里回复 vue前端发送指令 后端发送指令 设备接收指令 # ------------------ MQTT 封装 ------------------ @@ -93,11 +106,11 @@ async def _mqtt_handle_messages(): async for message in MQTT_CLIENT.messages: topic = MqttTopic(message.topic) print("收到消息:" + str(topic)) + payload = json.loads(message.payload.decode()) # 处理基础状态信息 if topic.is_status(): # 这里收到的数据是这样的:"x/status/x/deviceID/info" - payload = json.loads(message.payload.decode()) redis_client.set_device_info(topic.device_id, payload) dept_id = get_dept_id_by_iot_user_name(topic.device_id) dept_edge = get_device_type_by_iot_user_name(topic.device_id) @@ -112,12 +125,35 @@ async def _mqtt_handle_messages(): "receipt", payload_str, ) + print("设备" + topic.device_id + "变化:" + str(payload["online"])) + # 通知vue更新在线状态 + await ws_manager.noticeOnlineStatus( + { + "deviceId": topic.device_id, + "online": payload["online"], + "type": "status", + } + ) + # 如果是设备回复 + elif topic.is_response() and "request_id" in payload: + req_id = payload["request_id"] + future = pending_commands.get(req_id) + if future and not future.done(): + future.set_result(payload) # 唤醒等待的 HTTP 接口 async def mqtt_client_async(): global DEVICE_ID, MQTT_CLIENT DEVICE_ID = get_device_id_simple() print("服务端EMQX账号:", DEVICE_ID) + + lwt_topic, lwt_payload = get_status_info_topic_payload(False) + will = Will( + topic=lwt_topic, + payload=lwt_payload, + qos=1, # 通常 LWT 用 QoS 1 + retain=False, + ) async with Client( MQTT_BROKER, port=MQTT_PORT, @@ -125,8 +161,11 @@ async def mqtt_client_async(): password=MQTT_PASSWORD, tls_context=TLS_CONTEXT, identifier=DEVICE_ID, + will=will, ) as client: MQTT_CLIENT = client # 保存全局客户端 + client.on_connect = on_connect + client.on_disconnect = on_disconnect print("MQTT client connected") # 订阅默认 topic @@ -134,10 +173,73 @@ async def mqtt_client_async(): await MQTT_CLIENT.subscribe(topic.to_topic()) print(f"Subscribed to default topic: {topic.to_topic()}") + # 发送基础消息:"x/status/x/deviceID/info" + await public_device_status() # 启动消息处理循环 await _mqtt_handle_messages() +def on_connect(client, flags, rc, properties): + print("MQTT connected") + + +def on_disconnect(client, packet, exc=None): + print("MQTT disconnected:", exc) + + +INITIAL_RECONNECT_INTERVAL = 5 +MAX_RECONNECT_INTERVAL = 60 + + +async def mqtt_client_runner(): + global MQTT_CLIENT + reconnect_interval = INITIAL_RECONNECT_INTERVAL + + while True: + try: + await mqtt_client_async() + # 如果 mqtt_client_async 正常返回,说明是主动退出 + reconnect_interval = INITIAL_RECONNECT_INTERVAL + except Exception as e: + print("MQTT 连接异常:", e) + + print(f"等待 {reconnect_interval}s 后重连...") + await asyncio.sleep(reconnect_interval) + + # ⭐ 指数退避放在这里 + reconnect_interval = min(reconnect_interval * 2, MAX_RECONNECT_INTERVAL) + + +def get_status_info_topic_payload(is_online: bool): + info = { + "version": "1.0.0", # 替换成你的 APP 版本 + "online": is_online, + "ip": utils.get_local_ip(), + "hostname": socket.gethostname(), + "mac": utils.get_mac_address(), + "os": utils.platform.platform(), + "cpu": utils.get_cpu_info(), + "memory_total": utils.get_memory_total(), + "disk_total": utils.get_disk_total(), + "last_seen": utils.datetime.now(timezone.utc).isoformat(), + } + topic = "x/status/x/" + get_device_id_simple() + "/info" + payload = json.dumps(info) + return topic, payload + + +async def public_device_status(): + _, payload = get_status_info_topic_payload(True) + await mqtt_publish( + "x", + "status", + "server", + get_device_id_simple(), + "info", + payload, + ) + + # ------------------ 示例主程序 ------------------ diff --git a/bbit_ai/app/config/httpClient.py b/bbit_ai/app/config/httpClient.py new file mode 100644 index 0000000..24d4107 --- /dev/null +++ b/bbit_ai/app/config/httpClient.py @@ -0,0 +1,57 @@ +import httpx +from fastapi import HTTPException + + +class HttpClient: + def __init__(self, timeout: int = 10): + self.timeout = timeout + self.client = httpx.AsyncClient(timeout=timeout) + + async def request( + self, + method: str, + url: str, + params: dict = None, + data: dict = None, + json: dict = None, + headers: dict = None, + ): + """ + 通用请求方法 + method: GET / POST / PUT / DELETE 等 + url: 请求地址 + params: URL 参数 + data: form 数据 + json: JSON 数据 + headers: 请求头 + """ + try: + response = await self.client.request( + method=method.upper(), + url=url, + params=params, + data=data, + json=json, + headers=headers, + ) + response.raise_for_status() + return response.json() + except httpx.RequestError as e: + raise HTTPException(status_code=500, detail=f"请求失败: {e}") + except httpx.HTTPStatusError as e: + raise HTTPException( + status_code=e.response.status_code, detail=f"外部接口返回错误: {e}" + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"解析失败: {e}") + + async def get(self, url: str, params: dict = None, headers: dict = None): + return await self.request("GET", url, params=params, headers=headers) + + async def post( + self, url: str, data: dict = None, json: dict = None, headers: dict = None + ): + return await self.request("POST", url, data=data, json=json, headers=headers) + + async def close(self): + await self.client.aclose() diff --git a/bbit_ai/app/config/redis.py b/bbit_ai/app/config/redis.py index 11fa8d3..5f631fe 100644 --- a/bbit_ai/app/config/redis.py +++ b/bbit_ai/app/config/redis.py @@ -1,3 +1,5 @@ +import json + import redis @@ -5,7 +7,7 @@ import redis class RedisClient: - def __init__(self, config_path="config.yaml"): + def __init__(self): self.redis = redis.Redis( "10.10.12.101", 6379, @@ -13,17 +15,14 @@ class RedisClient: decode_responses=True, ) - def set_online(self, device_id: str): - key = f"device:online:{device_id}" - self.redis.set(key, 1) - - def set_offline(self, device_id: str): - key = f"device:online:{device_id}" - self.redis.delete(key) - - def is_device_online(self, device_id: str) -> bool: - key = f"device:online:{device_id}" - return self.redis.exists(key) == 1 + # 已废弃 之前使用webhookds的方式通知 但是因为通知不及时的原因,现在在线状态全盘交给device status + # def set_online(self, device_id: str): + # key = f"device:online:{device_id}" + # self.redis.set(key, 1) + # + # def set_offline(self, device_id: str): + # key = f"device:online:{device_id}" + # self.redis.delete(key) def set_device_info(self, device_id: str, info: dict): """ @@ -41,7 +40,32 @@ class RedisClient: def get_device_info(self, device_id: str) -> dict: key = f"device:info:{device_id}" - return self.redis.hgetall(key) + raw_info = self.redis.hgetall(key) + return raw_info + + def get_value(self, key: str): + """ + 获取 Redis 中的值,如果是 JSON 字符串自动解析为 dict + """ + value = self.redis.get(key) + if not value: + return None + try: + return json.loads(value) + except json.JSONDecodeError: + return value + + def set_value(self, key: str, value, expire: int = None): + """ + 存储 key-value,value 可以是 dict,自动序列化为 JSON + expire 单位秒 + """ + if isinstance(value, (dict, list)): + value = json.dumps(value) + if expire: + self.redis.set(key, value, ex=expire) + else: + self.redis.set(key, value) redis_client = RedisClient() diff --git a/bbit_ai/app/db/postgres/__init__.py b/bbit_ai/app/db/postgres/__init__.py index f93ef1e..4bd062b 100644 --- a/bbit_ai/app/db/postgres/__init__.py +++ b/bbit_ai/app/db/postgres/__init__.py @@ -1,6 +1,7 @@ from .aimessage import * from .aiprofile import * from .aisession import * +from .annual_meeting import * from .image_ysa import * from .iot import * from .knowledge import * diff --git a/bbit_ai/app/db/postgres/annual_meeting.py b/bbit_ai/app/db/postgres/annual_meeting.py new file mode 100644 index 0000000..b42c75e --- /dev/null +++ b/bbit_ai/app/db/postgres/annual_meeting.py @@ -0,0 +1,224 @@ +from uuid import UUID + +from config.minIO import get_temp_url +from config.pgDb import pg_pool +from utils import format_datetime + + +def get_all_exchange_records(): + """获取 annual_meeting_exchange 表所有记录""" + with pg_pool.getConn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT id, gift_code, name, created_at, is_finished, sort + FROM annual_meeting_exchange + ORDER BY sort + """ + ) + rows = cur.fetchall() + return [ + { + "id": row[0], + "gift_code": row[1], + "name": row[2], + "created_at": row[3], + "is_finished": row[4], + "sort": row[5], + } + for row in rows + ] + + +import random +import time + + +def reset_all_exchange_status(): + """将所有记录 is_finished 置为 False,并随机 position(以当前时间作为随机种子)""" + with pg_pool.getConn() as conn: + with conn.cursor() as cur: + # 获取总记录数 + cur.execute("SELECT id FROM annual_meeting_exchange") + ids = [row[0] for row in cur.fetchall()] + + # 用当前时间戳作为随机种子 + seed = int(time.time() * 1000) # 毫秒级 + random.seed(seed) + + # 生成随机顺序的 position + positions = list(range(1, len(ids) + 1)) + random.shuffle(positions) + + # 更新每条记录 + for record_id, pos in zip(ids, positions): + cur.execute( + """ + UPDATE annual_meeting_exchange + SET is_finished = FALSE, sort = %s + WHERE id = %s + """, + (pos, record_id), + ) + + conn.commit() + return {"updated_count": len(ids), "seed_used": seed} + + +def reset_user_status(target_user_id: str): + """重置指定用户的 is_finished""" + with pg_pool.getConn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + UPDATE annual_meeting_exchange + SET is_finished = True + WHERE id = %s + """, + (target_user_id,), + ) + conn.commit() + return {"updated": cur.rowcount} + + +def get_all_lottery(): + """获取 annual_meeting_lottery 全部数据""" + with pg_pool.getConn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT + id, + name, + is_opened, + oss, + created_at, + remark, + sort + FROM annual_meeting_lottery + ORDER BY sort ASC + """ + ) + rows = cur.fetchall() + + return [ + { + "id": row[0], + "name": row[1], + "is_opened": 1 if row[2] else 0, + "oss_url": get_temp_url("image-annual-lottery", row[3]), + "created_at": format_datetime(row[4]), + "remark": row[5], + "sort": row[6], + } + for row in rows + ] + + +def add_lottery(name: str, sort: int, oss: str, is_opened: bool, remark: str): + """新增礼品""" + with pg_pool.getConn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO annual_meeting_lottery + (id, name, sort, oss, is_opened, remark, created_at) + VALUES (gen_random_uuid(), %s, %s, %s, %s, %s, now()) + RETURNING id + """, + (name, sort, oss, is_opened, remark), + ) + new_id = cur.fetchone()[0] + conn.commit() + return {"id": new_id} + + +def update_lottery( + id: str, + name: str = None, + sort: int = None, + oss: str = None, + is_opened: bool = None, + remark: str = None, +): + """更新礼品信息""" + update_fields = [] + params = [] + + if name is not None: + update_fields.append("name = %s") + params.append(name) + if sort is not None: + update_fields.append("sort = %s") + params.append(sort) + if oss is not None: + update_fields.append("oss = %s") + params.append(oss) + if is_opened is not None: + update_fields.append("is_opened = %s") + params.append(is_opened) + if remark is not None: + update_fields.append("remark = %s") + params.append(remark) + + if not update_fields: + return {"updated": 0} + + params.append(id) + + sql = f""" + UPDATE annual_meeting_lottery + SET {", ".join(update_fields)} + WHERE id = %s + """ + + with pg_pool.getConn() as conn: + with conn.cursor() as cur: + cur.execute(sql, tuple(params)) + conn.commit() + return {"updated": cur.rowcount} + + +def delete_lottery(id: str): + """删除礼品""" + with pg_pool.getConn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + DELETE FROM annual_meeting_lottery + WHERE id = %s + """, + (id,), + ) + conn.commit() + return {"deleted": cur.rowcount} + + +def reset_lottery_item(item_id: UUID): + """将单个奖品标记为已开启""" + with pg_pool.getConn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + UPDATE annual_meeting_lottery + SET is_opened = TRUE + WHERE id = %s + """, + (item_id,), + ) + conn.commit() + return cur.rowcount + + +def reset_all_lottery_db(): + """将所有奖品 is_opened 置为 False""" + with pg_pool.getConn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + UPDATE annual_meeting_lottery + SET is_opened = FALSE + """ + ) + conn.commit() + return cur.rowcount diff --git a/bbit_ai/app/db/postgres/iot.py b/bbit_ai/app/db/postgres/iot.py index 0a4a0c8..d89bb3c 100644 --- a/bbit_ai/app/db/postgres/iot.py +++ b/bbit_ai/app/db/postgres/iot.py @@ -157,10 +157,11 @@ def update_device_db(id: str, data: dict) -> int: cursor.execute( """ UPDATE iot_users - SET remark=%s, is_active=%s, dept_id=%s, is_superuser=%s + SET name=%s, remark=%s, is_active=%s, dept_id=%s, is_superuser=%s WHERE id=%s; """, ( + data.get("name"), data.get("remark"), bool(data.get("status", 1)), data.get("dept_id"), diff --git a/bbit_ai/app/db/postgres/sentinel.py b/bbit_ai/app/db/postgres/sentinel.py index 5074463..1bb4d30 100644 --- a/bbit_ai/app/db/postgres/sentinel.py +++ b/bbit_ai/app/db/postgres/sentinel.py @@ -149,6 +149,77 @@ def get_sentinel_record_list_db_page( return result, total +def get_sentinel_record_by_id(record_id): + if not record_id: + return None + + sql = """ + SELECT + r.id, + r.license_plate, + r.vehicle_type, + r.license_plate_image, + r.vehicle_image, + r.livestock_type, + r.livestock_source, + r.is_inspected, + r.dept_id, + sd.name AS dept_name, + r.created_at, + r.updated_at, + r.remark + FROM sentinel_records r + LEFT JOIN sys_dept sd ON r.dept_id = sd.id + WHERE r.id = %s; + """ + + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute(sql, [record_id]) + row = cursor.fetchone() + + if not row: + return None + + ( + record_id, + license_plate, + vehicle_type, + license_plate_image, + vehicle_image, + livestock_type, + livestock_source, + is_inspected, + dept_id, + dept_name, + created_at, + updated_at, + remark, + ) = row + + result = { + "id": str(record_id), + "license_plate": license_plate, + "vehicle_type": vehicle_type, + "license_plate_image": get_temp_url_dict( + "sentinel", "license_plate", license_plate_image + ), + "vehicle_image": get_temp_url_dict( + "sentinel", "vehicle_image", vehicle_image + ), + "livestock_type": livestock_type, + "livestock_source": livestock_source, + "is_inspected": 1 if is_inspected else 0, + "dept_id": str(dept_id), + "dept_name": dept_name, + "created_at": format_datetime(created_at), + "updated_at": format_datetime(updated_at), + "remark": remark, + } + + return result + + def insert_sentinel_record(data: dict, dept_id) -> str: with pg_pool.getConn() as conn: with conn.cursor() as cursor: diff --git a/bbit_ai/app/db/postgres/system.py b/bbit_ai/app/db/postgres/system.py index fa26707..75d8ce8 100644 --- a/bbit_ai/app/db/postgres/system.py +++ b/bbit_ai/app/db/postgres/system.py @@ -583,6 +583,7 @@ def patch_user_db(id: str, data: dict) -> int: sql = f"UPDATE sys_users SET {', '.join(fields)} WHERE id = %s" params.append(id) cursor.execute(sql, tuple(params)) + updated_count = cursor.rowcount # 保存这次的结果 # ------------------------ # 2. roles 单独处理 @@ -597,7 +598,7 @@ def patch_user_db(id: str, data: dict) -> int: ) conn.commit() - return cursor.rowcount + return updated_count # 删除用户 @@ -954,14 +955,24 @@ def get_dept_id_by_user_id(user_id: str) -> str: return str(dept_id) +def get_dept_name_by_dept_id(user_id: str) -> str: + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT name FROM sys_dept WHERE id = %s", (user_id,)) + dept_id = cursor.fetchone() + dept_id = dept_id[0] + return str(dept_id) + + def get_dept_id_by_iot_user_name(user_id: str) -> str: # 通过 iot_user_id 查找其所属的 dept_id with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("SELECT dept_id FROM iot_users WHERE name = %s", (user_id,)) - dept_id = cursor.fetchone() - dept_id = str(dept_id[0]) - return dept_id + row = cursor.fetchone() + if row is None: + return None # 或者抛出自定义异常 + return str(row[0]) def get_device_type_by_iot_user_name(user_id: str) -> str: @@ -969,9 +980,10 @@ def get_device_type_by_iot_user_name(user_id: str) -> str: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("SELECT type FROM iot_users WHERE name = %s", (user_id,)) - type = cursor.fetchone() - type = str(type[0]) - return type + row = cursor.fetchone() + if row is None: + return None # 或者抛出自定义异常 + return str(row[0]) from typing import List diff --git a/bbit_ai/app/db/postgres/ws_manager.py b/bbit_ai/app/db/postgres/ws_manager.py index 16d80b8..76a3be0 100644 --- a/bbit_ai/app/db/postgres/ws_manager.py +++ b/bbit_ai/app/db/postgres/ws_manager.py @@ -4,13 +4,17 @@ from uuid import UUID from fastapi import WebSocket +PROJ_IOT_ONLINE_STATE = 0 +PROJ_SENTINEL_VEHICLE_STATES = 1 +PROJ_SENTINEL_MONITOR_STATUS = 2 + class ConnectionManager: def __init__(self): self.active_connections: List[dict] = [] # 保存 websocket 和用户信息 self.lock = asyncio.Lock() - # proj_id:0:在线状态 1:畜牧车辆进入 + # proj_id:0:在线状态 1:畜牧车辆进入 2:畜牧监控大屏 async def connect( self, websocket: WebSocket, user_id: UUID, dept_id: str, proj_id: int ): @@ -24,6 +28,9 @@ class ConnectionManager: "proj_id": proj_id, } ) + print( + "Device" + str(user_id) + " in project " + str(proj_id) + " connected" + ) async def disconnect(self, websocket: WebSocket): async with self.lock: @@ -34,7 +41,7 @@ class ConnectionManager: async def noticeOnlineStatus(self, message: dict): async with self.lock: for conn in self.active_connections: - if conn["proj_id"] == 0: + if conn["proj_id"] == PROJ_IOT_ONLINE_STATE: await conn["ws"].send_json(message) async def noticeSentinel( @@ -46,5 +53,23 @@ class ConnectionManager: async with self.lock: for conn in self.active_connections: if target_departments: - if conn["proj_id"] == 1 and conn["dept_id"] in target_departments: + if ( + conn["proj_id"] == PROJ_SENTINEL_VEHICLE_STATES + and conn["dept_id"] in target_departments + ): + await conn["ws"].send_json(message) + + async def noticeSentinelMonitorStatus( + self, message: dict, target_departments: List[UUID] = None + ): + """ + target_departments: 指定哪些部门能收到消息 + """ + async with self.lock: + for conn in self.active_connections: + if target_departments: + if ( + conn["proj_id"] == PROJ_SENTINEL_MONITOR_STATUS + and conn["dept_id"] in target_departments + ): await conn["ws"].send_json(message) diff --git a/bbit_ai/app/models/LotteryCreateReq.py b/bbit_ai/app/models/LotteryCreateReq.py new file mode 100644 index 0000000..9adb5e1 --- /dev/null +++ b/bbit_ai/app/models/LotteryCreateReq.py @@ -0,0 +1,11 @@ +from typing import Optional + +from pydantic import BaseModel + + +class LotteryCreateReq(BaseModel): + name: str + sort: int + oss: Optional[str] = None + is_opened: bool = False + remark: Optional[str] = None diff --git a/bbit_ai/app/models/LotteryUpdateReq.py b/bbit_ai/app/models/LotteryUpdateReq.py new file mode 100644 index 0000000..e1badfd --- /dev/null +++ b/bbit_ai/app/models/LotteryUpdateReq.py @@ -0,0 +1,12 @@ +from typing import Optional + +from pydantic import BaseModel + + +class LotteryUpdateReq(BaseModel): + id: str # 必传,用来指定要更新哪条记录 + name: Optional[str] = None + sort: Optional[int] = None + oss: Optional[str] = None + is_opened: Optional[bool] = None + remark: Optional[str] = None diff --git a/bbit_ai/app/models/MqttTopic.py b/bbit_ai/app/models/MqttTopic.py index 0b728f9..fa1cf7a 100644 --- a/bbit_ai/app/models/MqttTopic.py +++ b/bbit_ai/app/models/MqttTopic.py @@ -103,5 +103,8 @@ class MqttTopic: def is_status(self) -> bool: return self.domain == "status" and self.resource == "info" + def is_response(self) -> bool: + return self.domain == "receipt" + def is_cmd(self) -> bool: return self.domain == "cmd" diff --git a/bbit_ai/app/routers/AnnualMeeting.py b/bbit_ai/app/routers/AnnualMeeting.py new file mode 100644 index 0000000..d39f7d5 --- /dev/null +++ b/bbit_ai/app/routers/AnnualMeeting.py @@ -0,0 +1,122 @@ +import pathlib +import uuid +from uuid import UUID + +from fastapi import Depends, APIRouter + +import db.postgres as pg +from config.minIO import get_upload_token +from config.security import get_user_id_from_token +from models.BaseResponse import BaseResponse +from models.LotteryCreateReq import LotteryCreateReq +from models.LotteryUpdateReq import LotteryUpdateReq + +amRouter = APIRouter() + + +@amRouter.get("/ExGetList") +def AmExGetList(user_id: UUID = Depends(get_user_id_from_token)): + if not user_id: + return {"error": "userId is required"} + data = pg.get_all_exchange_records() + return BaseResponse(data=data) + + +@amRouter.get("/ExReset") +def AMExReset(user_id: UUID = Depends(get_user_id_from_token)): + if not user_id: + return {"error": "userId is required"} + data = pg.reset_all_exchange_status() + return BaseResponse(data=data, message="已重新打乱顺序") + + +@amRouter.put("/ExResetTargetStatus") +def AMExResetTargetStatus( + target_user_id: str, user_id: UUID = Depends(get_user_id_from_token) +): + if not user_id: + return {"error": "userId is required"} + data = pg.reset_user_status(target_user_id) + return BaseResponse(data=data) + + +@amRouter.get("/Lottery/getUploadUrl") +def getUploadUrl( + filename: str | None = None, + user_id: UUID = Depends(get_user_id_from_token), +): + if not user_id: + return {"error": "userId is required"} + # 生成唯一文件名,避免覆盖 + ext = pathlib.Path(filename).suffix if filename else "" # 获取文件后缀 + object_name = f"{uuid.uuid4()}{ext}" # 拼接到 UUID 后面 + return BaseResponse( + data={ + "uploadUrl": get_upload_token("image-annual-lottery", object_name), + "id": object_name, + } + ) + + +@amRouter.get("/Lottery/List") +def AMGetLotteryList(user_id: UUID = Depends(get_user_id_from_token)): + if not user_id: + return {"error": "userId is required"} + data = pg.get_all_lottery() + return BaseResponse(data=data) + + +@amRouter.post("/Lottery/Add") +def AMAddLottery( + data: LotteryCreateReq, + user_id: UUID = Depends(get_user_id_from_token), +): + if not user_id: + return {"error": "userId is required"} + new_data = pg.add_lottery( + name=data.name, + sort=data.sort, + oss=data.oss, + is_opened=data.is_opened, + remark=data.remark, + ) + return BaseResponse(data=new_data) + + +@amRouter.put("/Lottery/Update") +def AMUpdateLottery( + data: LotteryUpdateReq, + user_id: UUID = Depends(get_user_id_from_token), +): + if not user_id: + return {"error": "userId is required"} + + updated = pg.update_lottery( + id=data.id, + name=data.name, + sort=data.sort, + oss=data.oss, + is_opened=data.is_opened, + remark=data.remark, + ) + return BaseResponse(data=updated) + + +@amRouter.delete("/Lottery/Delete") +def AMDeleteLottery(id: str, user_id: UUID = Depends(get_user_id_from_token)): + if not user_id: + return {"error": "userId is required"} + data = pg.delete_lottery(id) + return BaseResponse(data=data) + + +@amRouter.patch("/Lottery/open/{item_id}") +def open_lottery_item(item_id: UUID): + updated = pg.reset_lottery_item(item_id) + return BaseResponse(data=updated) + + +@amRouter.patch("/Lottery/resetAll") +def reset_all_lottery(): + count = pg.reset_all_lottery_db() + return BaseResponse(data=count) diff --git a/bbit_ai/app/routers/Iot.py b/bbit_ai/app/routers/Iot.py index 3804398..5e285e9 100644 --- a/bbit_ai/app/routers/Iot.py +++ b/bbit_ai/app/routers/Iot.py @@ -1,4 +1,5 @@ import asyncio +import json import pathlib import uuid from uuid import UUID @@ -6,14 +7,11 @@ from uuid import UUID from fastapi import APIRouter from fastapi import Depends -from config.emqx import mqtt_publish from config.minIO import get_upload_token from config.redis import redis_client from db.postgres.iot import * from models.BaseResponse import BaseResponse -from models.EMQXWebhook import EMQXWebhook from models.IotDeviceCommandRequest import IotDeviceCommandRequest -from routers.WS import ws_manager iot_router = APIRouter() from config.security import get_user_id_from_token @@ -21,43 +19,44 @@ from config.security import get_user_id_from_token # -------------------- 设备接口 -------------------- -@iot_router.post("/common/webhook") -async def emqx_webhook(data: EMQXWebhook): - device_id = data.clientid - event = data.event - - if event == "client.connected": - redis_client.set_online(device_id) - # 这里刻意等1s 是因为设备连接后这里首先接到通知,但是状态信息设备来没来得及通过mqtt发送来,所以在此等待 - # 没有直接在mqtt发送来的消息中获取在线状态是因为 这里是通过emqx的webhooks通知的,两种通知方式不同,一方面防止其中一种逻辑失效,另一方面在mqtt消息接收中设置在线状态会存在滞后性,同时也需要设置遗嘱消息,较为 - await asyncio.sleep(1) - await ws_manager.noticeOnlineStatus( - { - "deviceId": device_id, - "online": True, - "type": "status", - } - ) - - print(f"[新设备在线] {device_id}") - - elif event == "client.disconnected": - redis_client.set_offline(device_id) - await ws_manager.noticeOnlineStatus( - { - "deviceId": device_id, - "online": False, - "type": "status", - } - ) - - print(f"[设备离线] {device_id}") - - else: - # 其他事件直接忽略 - print(f"[其他事件] {event}") - - return {"ok": True} +# 已废弃 Webhooks的离线通知不及时(突然断电断网) +# @iot_router.post("/common/webhook") +# async def emqx_webhook(data: EMQXWebhook): +# device_id = data.clientid +# event = data.event +# +# if event == "client.connected": +# redis_client.set_online(device_id) +# # 这里刻意等1s 是因为设备连接后这里首先接到通知,但是状态信息设备来没来得及通过mqtt发送来,所以在此等待 +# # 没有直接在mqtt发送来的消息中获取在线状态是因为 这里是通过emqx的webhooks通知的,两种通知方式不同,一方面防止其中一种逻辑失效,另一方面在mqtt消息接收中设置在线状态会存在滞后性,同时也需要设置遗嘱消息,较为 +# await asyncio.sleep(1) +# await ws_manager.noticeOnlineStatus( +# { +# "deviceId": device_id, +# "online": True, +# "type": "status", +# } +# ) +# +# print(f"[新设备在线] {device_id}") +# +# elif event == "client.disconnected": +# redis_client.set_offline(device_id) +# await ws_manager.noticeOnlineStatus( +# { +# "deviceId": device_id, +# "online": False, +# "type": "status", +# } +# ) +# +# print(f"[设备离线] {device_id}") +# +# else: +# # 其他事件直接忽略 +# print(f"[其他事件] {event}") +# +# return {"ok": True} @iot_router.get("/common/device/list") @@ -83,9 +82,9 @@ async def get_device_list( # ===== 👇 核心:补在线状态 ===== for d in devices: device_id = d["name"] # 账号 - d["online"] = redis_client.is_device_online(device_id) == 1 info_json = redis_client.get_device_info(device_id) + d["online"] = info_json.get("online", "0") == "1" d["version"] = info_json.get("version", "") d["ip"] = info_json.get("ip", "") d["hostname"] = info_json.get("hostname", "") @@ -261,6 +260,10 @@ def getUploadUrl( return BaseResponse(data=get_update_package(deviceID)) +# request_id -> asyncio.Future +pending_commands: dict[str, asyncio.Future] = {} + + @iot_router.post("/common/device/command") async def command( data: IotDeviceCommandRequest, user_id: UUID = Depends(get_user_id_from_token) @@ -268,7 +271,32 @@ async def command( if not user_id: return {"error": "userId is required"} + request_id = str(uuid.uuid4()) + payload = {"request_id": request_id} + + loop = asyncio.get_running_loop() + future = loop.create_future() + pending_commands[request_id] = future + + from config.emqx import mqtt_publish + await mqtt_publish( - data.dept_id, "cmd", data.device_type, data.id, data.command, "{}" + data.dept_id, + "cmd", + data.device_type, + data.id, + data.command, + json.dumps(payload), ) - return BaseResponse(data=None) + + try: + result = await asyncio.wait_for(future, timeout=5) + return BaseResponse(data=result.get("massage")) + except asyncio.TimeoutError: + return BaseResponse(data=None, message="Device did not respond in time") + except asyncio.CancelledError: + # 请求被中断,必须清理,但不要吞 + pending_commands.pop(request_id, None) + raise + finally: + pending_commands.pop(request_id, None) diff --git a/bbit_ai/app/routers/Sentinel.py b/bbit_ai/app/routers/Sentinel.py index 5328d3d..3777ecb 100644 --- a/bbit_ai/app/routers/Sentinel.py +++ b/bbit_ai/app/routers/Sentinel.py @@ -2,6 +2,9 @@ from uuid import UUID from fastapi import Depends, APIRouter +from config.httpClient import HttpClient +from config.minIO import get_temp_url +from config.redis import redis_client from config.security import get_user_id_from_token from db.postgres import get_dept_ids_by_user_id, get_dept_id_by_user_id from db.postgres.sentinel import * @@ -96,3 +99,94 @@ async def delete_sentinel_record( if deleted == 0: return BaseResponse(status=False, message="记录不存在", data=None) return BaseResponse(data=True) + + +@sentinel_router.get("/monitor/promotional/list") +async def get_sentinel_monitor_promotional_list( + user_id: UUID = Depends(get_user_id_from_token), +): + if not user_id: + return {"error": "userId is required"} + return BaseResponse( + data=[ + { + "id": 1, + "remark": "人员公示及岗位职责", + "url": get_temp_url("sentinel", "promotional/promotional (2).jpg"), + }, + { + "id": 2, + "remark": "入川动物监督检查工作流程图", + "url": get_temp_url("sentinel", "promotional/promotional (1).jpg"), + }, + { + "id": 3, + "remark": "四川省人民政府关于设立人川动物运输指定通道的通告", + "url": get_temp_url("sentinel", "promotional/promotional (3).jpg"), + }, + ] + ) + + +http_client = HttpClient() + + +@sentinel_router.get("/monitor/list") +async def get_sentinel_monitor_list( + user_id: UUID = Depends(get_user_id_from_token), +): + if not user_id: + return {"error": "userId is required"} + + # 尝试从 Redis 获取 accessToken + access_token = redis_client.get_value("ys7:access_token") + if not access_token: + url = "https://open.ys7.com/api/lapp/token/get" + payload = { + "appKey": "c85e53559223457f90f06cd215513c3d", + "appSecret": "9424419da5292707eff2007e9ae37f0d", + } + result = await http_client.post(url, data=payload) + access_token = result["data"]["accessToken"] + redis_client.set_value( + "ys7:access_token", access_token, expire=7 * 24 * 60 * 60 # 7天过期 + ) + + url = "https://open.ys7.com/api/lapp/v2/live/address/get" + # device_serials = ["BG2493625"] + device_serials = ["BG2493625", "GH3713250", "GH3714496", "GH3714497"] + + video_expire_time = 25 * 24 * 60 * 60 # 25 天 + res = [] + for device_serial in device_serials: + live_key = f"ys7:live:{device_serial}" + cached_live = redis_client.get_value(live_key) + + if cached_live: + video_id = cached_live.get("id") + video_url = cached_live.get("url") + else: + payload = { + "accessToken": access_token, + "deviceSerial": device_serial, + "protocol": 4, # 流播放协议,1-ezopen、2-hls、3-rtmp、4-flv,默认为1 + "expireTime": video_expire_time, # 25天 + "supportH265": 0, + "quality": 2, + } + result = await http_client.post(url, data=payload) + video_id = result["data"]["id"] + video_url = result["data"]["url"] + # 存到 Redis,自动序列化为 JSON,过期 25天 + redis_client.set_value( + live_key, + {"id": video_id, "url": video_url}, + expire=video_expire_time, + ) + res.append( + { + "id": video_id, + "url": video_url, + } + ) + return BaseResponse(data=res) diff --git a/bbit_ai/app/routers/System.py b/bbit_ai/app/routers/System.py index 582aa09..cab72ea 100644 --- a/bbit_ai/app/routers/System.py +++ b/bbit_ai/app/routers/System.py @@ -41,7 +41,7 @@ async def dept_add(data: dict, user_id: UUID = Depends(get_user_id_from_token)): parent_id = data.get("pid") name = data.get("name") - comment = data.get("comment") + comment = data.get("remark") if not name: return BaseResponse(status=False, message="部门名不能为空", data=None) diff --git a/bbit_ai/app/routers/Vision.py b/bbit_ai/app/routers/Vision.py index a278932..c416da6 100644 --- a/bbit_ai/app/routers/Vision.py +++ b/bbit_ai/app/routers/Vision.py @@ -127,10 +127,10 @@ async def createSilkwormCocoonAnalysisTask( return {"error": "userId is required"} try: contents = await file.read() - await MyUtils.async_task( + res = await MyUtils.async_task( process_silkworm_cocoon_image, contents, file.filename, projectName, user_id ) - return BaseResponse(data=None) + return BaseResponse(data=res) except Exception as e: return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None) diff --git a/bbit_ai/app/routers/WS.py b/bbit_ai/app/routers/WS.py index d4b60bf..992ab0d 100644 --- a/bbit_ai/app/routers/WS.py +++ b/bbit_ai/app/routers/WS.py @@ -49,3 +49,23 @@ async def websocket_sentinel_record( await websocket.receive_text() except WebSocketDisconnect: await ws_manager.disconnect(websocket) + + +# Vue 牧安云哨 监控大屏 消息通知 +@iot_ws_router.websocket("/sentinel_record_notice") +async def websocket_sentinel_record( + websocket: WebSocket, + token: str = Query(...), +): + user_id = get_user_id_from_token_from_ws(token) + dept_id = get_dept_id_by_user_id(user_id) # 查数据库或缓存 + print("user_id:", user_id) + print("dept_id:", dept_id) + print("已接入") + await ws_manager.connect(websocket, user_id, dept_id, 2) + + try: + while True: + await websocket.receive_text() + except WebSocketDisconnect: + await ws_manager.disconnect(websocket) diff --git a/bbit_ai/app/service/vision.py b/bbit_ai/app/service/vision.py index e03099c..2f9f08c 100644 --- a/bbit_ai/app/service/vision.py +++ b/bbit_ai/app/service/vision.py @@ -5,10 +5,13 @@ import config.minIO as minIO import db.postgres as pg from agent.licenseImageAgent import get_license_response from agent.vehicleImageAgent import get_vehicle_response -from config.minIO import minio_client +from config.minIO import minio_client, get_temp_url from config.yolo import YOLOSingleton -from db.postgres import get_dept_id_by_iot_user_name, get_dept_ids_by_dept_id -from db.postgres.sentinel import update_sentinel_record +from db.postgres import ( + get_dept_id_by_iot_user_name, + get_dept_ids_by_dept_id, +) +from db.postgres.sentinel import update_sentinel_record, get_sentinel_record_by_id from llm.ticketLLM import * from llm.ticketLLMv2 import get_ticket_response_v2 from models.SentinelRecordRequest import SentinelRecordRequest @@ -173,7 +176,7 @@ def process_silkworm_cocoon_image( ) return { "resolution": resolution, - "size": size_kb, + "size": size_kb / 1024, "cocoon_count": results_json.get("total_objects"), "max_confidence": results_json.get("max_confidence"), "min_confidence": results_json.get("min_confidence"), @@ -182,6 +185,7 @@ def process_silkworm_cocoon_image( "inference_time_ms": speed_json.get("inference"), "postprocess_time_ms": speed_json.get("postprocess"), "details": results_json.get("class_counts"), + "imageUrl": get_temp_url(bucket_name, "ai/" + after_object_name), } @@ -190,14 +194,18 @@ async def process_vehicle_animal_image( ): # 通过设备id获得组织id dept_id = get_dept_id_by_iot_user_name(data.DeviceId) + # 得到动物类型 oss_url = minIO.get_temp_url("sentinel", "vehicle_image/" + data.VehicleImage) analysis_result = await get_vehicle_response(oss_url) livestock_type = analysis_result.get("livestock_type", "") remark = analysis_result.get("remark", "") - + # 保存到数据库 + update_sentinel_record(data.Id, livestock_type, remark, dept_id) + # 可以通知的部门ids available_departments = get_dept_ids_by_dept_id(dept_id) + # 通知控制界面 await ws_manager.noticeSentinel( { "content": f"载有{livestock_type}的车辆即将进入关卡,请准备检查", @@ -205,5 +213,11 @@ async def process_vehicle_animal_image( }, available_departments, ) - # 保存到数据库 - return update_sentinel_record(data.Id, livestock_type, remark, dept_id) + # 通知大屏界面 + await ws_manager.noticeSentinelMonitorStatus( + { + "content": get_sentinel_record_by_id(data.Id), + "type": "vehicle_alert", + }, + available_departments, + ) diff --git a/bbit_ai/app/utils/MyUtils.py b/bbit_ai/app/utils/MyUtils.py index 3485f2c..2f62940 100644 --- a/bbit_ai/app/utils/MyUtils.py +++ b/bbit_ai/app/utils/MyUtils.py @@ -1,7 +1,10 @@ import asyncio +import platform +import socket import threading from datetime import datetime +import psutil import pytz @@ -65,3 +68,35 @@ def is_valid_uuid(value: str): return True except: return False + + +def get_local_ip(): + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + ip = s.getsockname()[0] + s.close() + return ip + except Exception: + return "127.0.0.1" + + +def get_mac_address(): + # 获取第一个网卡的 MAC + for iface, addrs in psutil.net_if_addrs().items(): + for addr in addrs: + if addr.family == psutil.AF_LINK: + return addr.address + return "00:00:00:00:00:00" + + +def get_cpu_info(): + return platform.processor() + + +def get_memory_total(): + return psutil.virtual_memory().total + + +def get_disk_total(): + return psutil.disk_usage("/").total diff --git a/bbit_ai/app/utils/__init__.py b/bbit_ai/app/utils/__init__.py index e69de29..5cdc4e7 100644 --- a/bbit_ai/app/utils/__init__.py +++ b/bbit_ai/app/utils/__init__.py @@ -0,0 +1 @@ +from .MyUtils import * diff --git a/ktor/src/main/kotlin/ink/snowflake/server/model/database/ScaImagesTable.kt b/ktor/src/main/kotlin/ink/snowflake/server/model/database/ScaImagesTable.kt index a28d53d..9b73f68 100644 --- a/ktor/src/main/kotlin/ink/snowflake/server/model/database/ScaImagesTable.kt +++ b/ktor/src/main/kotlin/ink/snowflake/server/model/database/ScaImagesTable.kt @@ -6,7 +6,7 @@ import org.jetbrains.exposed.v1.core.dao.id.UUIDTable import org.jetbrains.exposed.v1.datetime.datetime import org.jetbrains.exposed.v1.json.json -object ScaImagesTable : UUIDTable("sca_images") { +object ScaImagesTable : UUIDTable("image_sca") { val name = varchar("name", 255) val upload_datetime = datetime("upload_datetime") val file_name = varchar("file_name", 255) diff --git a/vue2/pnpm-lock.yaml b/vue2/pnpm-lock.yaml index f75bf83..f1c5717 100644 --- a/vue2/pnpm-lock.yaml +++ b/vue2/pnpm-lock.yaml @@ -699,6 +699,9 @@ importers: echarts: specifier: 'catalog:' version: 6.0.0 + ezuikit-flv: + specifier: ^2.1.0 + version: 2.1.0 js-sha256: specifier: ^0.11.1 version: 0.11.1 @@ -6356,6 +6359,9 @@ packages: extendable-error@0.1.7: resolution: {integrity: sha512-UOiS2in6/Q0FK0R0q6UY9vYpQ21mr/Qn1KOnte7vsACuNJf514WvCCUHSRCPcgjPT2bAhNIJdlE6bVap1GKmeg==} + ezuikit-flv@2.1.0: + resolution: {integrity: sha512-BNFzbkXxmjRyHZfBgG7V9wS1eRKTpp39U9Xd4gS3vZLPEmeFrpPxd++kweZTLbb2Qa5NMb97Eaj/8SWduaSn9Q==} + fast-deep-equal@3.1.3: resolution: {integrity: sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==} @@ -15788,6 +15794,8 @@ snapshots: extendable-error@0.1.7: {} + ezuikit-flv@2.1.0: {} + fast-deep-equal@3.1.3: {} fast-diff@1.3.0: {}