diff --git a/bbit_ai/app/app.py b/bbit_ai/app/app.py index 7affc65..8e0ff00 100644 --- a/bbit_ai/app/app.py +++ b/bbit_ai/app/app.py @@ -8,13 +8,16 @@ from config.yolo import YOLOSingleton from routers.Bot import botRouter from routers.Chat import chatRouter from routers.Datasource import reportDataRouter +from routers.Iot import iot_router from routers.Knowledge import knowledgeRouter from routers.Public import publicRouter from routers.RabbitMQ import rqRouter from routers.Report import reportRouter +from routers.Sentinel import sentinel_router from routers.Service import serviceRouter from routers.System import systemRouter from routers.Vision import visionRouter +from routers.WS import iot_ws_router async def ai_lab(): @@ -46,6 +49,9 @@ async def ai_lab(): app.include_router(r, prefix="/llm", tags=["llm"]) app.include_router(visionRouter, prefix="/cv", tags=["cv"]) app.include_router(systemRouter, prefix="/system", tags=["system"]) + app.include_router(iot_router, prefix="/iot", tags=["iot"]) + app.include_router(sentinel_router, prefix="/iot/sentinel", tags=["iot_sentinel"]) + app.include_router(iot_ws_router, prefix="/iot/ws", tags=["iot_ws"]) app.include_router(publicRouter, prefix="/api/public", tags=["api"]) config = Config(app=app, host="0.0.0.0", port=13011, log_level="debug") server = Server(config) diff --git a/bbit_ai/app/config/minIO.py b/bbit_ai/app/config/minIO.py index cf8b25c..16e61d6 100644 --- a/bbit_ai/app/config/minIO.py +++ b/bbit_ai/app/config/minIO.py @@ -29,6 +29,25 @@ def get_upload_token(user_id, bucket_name, object_name, xpires=timedelta(minutes def get_temp_url(bucket_name, object_name): + # 如果 object_name 为 None 或空字符串,则返回默认图片 + if not object_name or not bucket_name: + bucket_name = "system" + object_name = "favicon.ico" + + # 使用 presigned_get_object 获取临时 URL return minio_client.presigned_get_object( bucket_name, object_name, expires=timedelta(seconds=3600) ) + + +def get_temp_url_dict(bucket_name, object_dict, object_name): + # 如果 object_name 为 None 或空字符串,则返回默认图片 + if not object_name: + bucket_name = "system" + object_dict = "default" + object_name = "favicon.ico" + + # 使用 presigned_get_object 获取临时 URL + return minio_client.presigned_get_object( + bucket_name, object_dict + "/" + object_name, expires=timedelta(seconds=3600) + ) diff --git a/bbit_ai/app/config/redis.py b/bbit_ai/app/config/redis.py new file mode 100644 index 0000000..1a5cc19 --- /dev/null +++ b/bbit_ai/app/config/redis.py @@ -0,0 +1,24 @@ +import redis + + +class RedisClient: + + def __init__(self, config_path="config.yaml"): + self.redis = redis.Redis( + "10.10.12.101", + 6379, + 0, + decode_responses=True, + ) + + def set_online(self, device_id: str): + key = f"device:online:{device_id}" + self.redis.set(key, 1) + + def set_offline(self, device_id: str): + key = f"device:online:{device_id}" + self.redis.delete(key) + + def is_device_online(self, device_id: str) -> bool: + key = f"device:online:{device_id}" + return self.redis.exists(key) == 1 diff --git a/bbit_ai/app/db/postgres/__init__.py b/bbit_ai/app/db/postgres/__init__.py index 681f831..b13eb1c 100644 --- a/bbit_ai/app/db/postgres/__init__.py +++ b/bbit_ai/app/db/postgres/__init__.py @@ -1,6 +1,7 @@ from .aimessage import * from .aiprofile import * from .aisession import * +from .iot import * from .knowledge import * from .license import * from .report import * diff --git a/bbit_ai/app/db/postgres/iot.py b/bbit_ai/app/db/postgres/iot.py new file mode 100644 index 0000000..2d723ed --- /dev/null +++ b/bbit_ai/app/db/postgres/iot.py @@ -0,0 +1,203 @@ +from hashlib import sha256 + +from config.pgDb import pg_pool +from utils.MyUtils import format_datetime, is_valid_uuid + + +def get_device_list_db_page( + page: int, + page_size: int, + device_id=None, + name=None, + status=None, + is_superuser=None, + dept_id=None, + startTime=None, + endTime=None, +): + offset = (page - 1) * page_size + + conditions = [] + params = [] + + # ---- 设备 ID(uuid)---- + if device_id: + conditions.append("d.id::text LIKE %s") + params.append(f"%{device_id}%") + + # ---- 设备名模糊搜索 ---- + if name: + conditions.append("d.name LIKE %s") + params.append(f"%{name}%") + + # ---- 状态 ---- + if status is not None: + conditions.append("d.is_active = %s") + params.append(status == 1) + + # ---- 状态 ---- + if is_superuser is not None: + conditions.append("d.is_superuser = %s") + params.append(is_superuser == 1) + + # ---- 部门 ---- + if dept_id and is_valid_uuid(dept_id): + conditions.append("d.dept_id = %s") + params.append(dept_id) + + # ---- 时间过滤 ---- + if startTime: + conditions.append("d.created_at >= %s") + params.append(startTime) + + if endTime: + conditions.append("d.created_at <= %s") + params.append(endTime) + + where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" + + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + + # ---- 统计总数 ---- + count_sql = f""" + SELECT COUNT(*) + FROM iot_users d + {where_clause}; + """ + cursor.execute(count_sql, params) + total = cursor.fetchone()[0] + + # ---- 分页查询 ---- + list_sql = f""" + SELECT + d.id, + d.name, + d.remark, + d.is_active, + d.is_superuser, + d.dept_id, + sd.name AS dept_name, + d.created_at + FROM iot_users d + LEFT JOIN sys_dept sd ON d.dept_id = sd.id + {where_clause} + ORDER BY d.created_at DESC + LIMIT %s OFFSET %s; + """ + + cursor.execute(list_sql, params + [page_size, offset]) + rows = cursor.fetchall() + + result = [] + for r in rows: + ( + device_id, + name, + remark, + is_active, + is_superuser, + dept_id, + dept_name, + created_at, + ) = r + + result.append( + { + "id": device_id, + "name": name, + "remark": remark, + "status": 1 if is_active else 0, + "is_superuser": 1 if is_superuser else 0, + "dept_id": dept_id, + "dept_name": dept_name, + "created_at": format_datetime(created_at), + } + ) + + return result, total + + +def insert_device(data: dict) -> str: + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + name = data.get("name") + salt = name[-4:] if name else "" + password = data.get("password", "123456") + password_hash = sha256((salt + password).encode("utf-8")).hexdigest() + + cursor.execute( + """ + INSERT INTO iot_users (name, password_hash, salt, remark, dept_id, is_active, is_superuser) + VALUES (%s, %s, %s, %s, %s, %s, %s) + RETURNING id; + """, + ( + name, + password_hash, + salt, + data.get("remark"), + data.get("dept_id"), + bool(data.get("status", 1)), + bool(data.get("is_superuser", 0)), + ), + ) + device_id = cursor.fetchone()[0] + conn.commit() + return device_id + + +def update_device_db(id: str, data: dict) -> int: + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute( + """ + UPDATE iot_users + SET remark=%s, is_active=%s, dept_id=%s, is_superuser=%s + WHERE id=%s; + """, + ( + data.get("remark"), + bool(data.get("status", 1)), + data.get("dept_id"), + bool(data.get("is_superuser", 0)), + id, + ), + ) + conn.commit() + return cursor.rowcount + + +def patch_device_db(id: str, data: dict) -> int: + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + fields = [] + params = [] + mapping = { + "remark": "remark", + "status": "is_active", + "is_superuser": "is_superuser", + "dept_id": "dept_id", + } + for k, column in mapping.items(): + if k in data: + value = ( + bool(data[k]) if k in ["status", "is_superuser"] else data[k] + ) + fields.append(f"{column} = %s") + params.append(value) + + if fields: + sql = f"UPDATE iot_users SET {', '.join(fields)} WHERE id = %s" + params.append(id) + cursor.execute(sql, tuple(params)) + conn.commit() + return cursor.rowcount + + +def delete_device_db(id: str) -> int: + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute("DELETE FROM iot_users WHERE id=%s;", (id,)) + conn.commit() + return cursor.rowcount diff --git a/bbit_ai/app/db/postgres/sentinel.py b/bbit_ai/app/db/postgres/sentinel.py new file mode 100644 index 0000000..67e7f19 --- /dev/null +++ b/bbit_ai/app/db/postgres/sentinel.py @@ -0,0 +1,261 @@ +from config.minIO import get_temp_url_dict +from config.pgDb import pg_pool +from utils.MyUtils import format_datetime + + +def get_sentinel_record_list_db_page( + page: int, + page_size: int, + record_id=None, + license_plate=None, + vehicle_type=None, + is_inspected=None, + livestock_source=None, + livestock_type=None, + dept_ids=None, + start_time=None, + end_time=None, +): + offset = (page - 1) * page_size + + conditions = [] + params = [] + + # ---- 记录 ID ---- + if record_id: + conditions.append("r.id::text LIKE %s") + params.append(f"%{record_id}%") + + # ---- 车牌号 ---- + if license_plate: + conditions.append("r.license_plate LIKE %s") + params.append(f"%{license_plate}%") + + # ---- 车型 ---- + if vehicle_type: + conditions.append("r.vehicle_type LIKE %s") + params.append(f"%{vehicle_type}%") + + # ---- 是否检查 ---- + if is_inspected is not None: + conditions.append("r.is_inspected = %s") + params.append(bool(is_inspected)) + + # ---- 来源 ---- + if livestock_source is not None: + conditions.append("r.livestock_source LIKE %s") + params.append(f"%{livestock_source}%") + + # ---- 种类 ---- + if livestock_type is not None: + conditions.append("r.livestock_type LIKE %s") + params.append(livestock_type) + + # ---- 部门 ---- + if dept_ids: + conditions.append("r.dept_id = ANY(%s)") + params.append(dept_ids) + + # ---- 时间过滤 ---- + if start_time: + conditions.append("r.created_at >= %s") + params.append(start_time) + + if end_time: + conditions.append("r.created_at <= %s") + params.append(end_time) + + where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" + + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + + # ---- 统计总数 ---- + count_sql = f""" + SELECT COUNT(*) + FROM sentinel_records r + {where_clause}; + """ + cursor.execute(count_sql, params) + total = cursor.fetchone()[0] + + # ---- 分页查询 ---- + list_sql = f""" + SELECT + r.id, + r.license_plate, + r.vehicle_type, + r.license_plate_image, + r.vehicle_image, + r.livestock_type, + r.livestock_source, + r.is_inspected, + r.dept_id, + sd.name AS dept_name, + r.created_at, + r.updated_at, + r.remark + FROM sentinel_records r + LEFT JOIN sys_dept sd ON r.dept_id = sd.id + {where_clause} + ORDER BY r.created_at DESC + LIMIT %s OFFSET %s; + """ + + cursor.execute(list_sql, params + [page_size, offset]) + rows = cursor.fetchall() + + result = [] + for r in rows: + ( + record_id, + license_plate, + vehicle_type, + license_plate_image, + vehicle_image, + livestock_type, + livestock_source, + is_inspected, + dept_id, + dept_name, + created_at, + updated_at, + remark, + ) = r + + result.append( + { + "id": record_id, + "license_plate": license_plate, + "vehicle_type": vehicle_type, + "license_plate_image": get_temp_url_dict( + "sentinel", "license_plate", license_plate_image + ), + "vehicle_image": get_temp_url_dict( + "sentinel", "vehicle_image", vehicle_image + ), + "livestock_type": livestock_type, + "livestock_source": livestock_source, + "is_inspected": 1 if is_inspected else 0, + "dept_id": dept_id, + "dept_name": dept_name, + "created_at": format_datetime(created_at), + "updated_at": format_datetime(updated_at), + "remark": remark, + } + ) + + return result, total + + +def insert_sentinel_record(data: dict, dept_id) -> str: + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute( + """ + INSERT INTO sentinel_records ( + license_plate, + vehicle_type, + license_plate_image, + vehicle_image, + livestock_type, + livestock_source, + is_inspected, + dept_id, + remark, + created_by + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + RETURNING id; + """, + ( + data.get("license_plate"), + data.get("vehicle_type"), + data.get("license_plate_image"), + data.get("vehicle_image"), + data.get("livestock_type"), + data.get("livestock_source"), + bool(data.get("is_inspected", False)), + dept_id, + data.get("remark"), + data.get("created_by"), + ), + ) + record_id = cursor.fetchone()[0] + conn.commit() + return record_id + + +def update_sentinel_record_db(id: str, data: dict) -> int: + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute( + """ + UPDATE sentinel_records + SET + license_plate = %s, + vehicle_type = %s, + license_plate_image = %s, + vehicle_image = %s, + livestock_type = %s, + livestock_source = %s, + is_inspected = %s, + remark = %s, + updated_at = now() + WHERE id = %s; + """, + ( + data.get("license_plate"), + data.get("vehicle_type"), + data.get("license_plate_image"), + data.get("vehicle_image"), + data.get("livestock_type"), + data.get("livestock_source"), + bool(data.get("is_inspected", False)), + data.get("remark"), + id, + ), + ) + conn.commit() + return cursor.rowcount + + +def patch_sentinel_record_db(id: str, data: dict) -> int: + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + fields = [] + params = [] + mapping = { + "license_plate": "license_plate", + "vehicle_type": "vehicle_type", + "license_plate_image": "license_plate_image", + "vehicle_image": "vehicle_image", + "livestock_type": "livestock_type", + "livestock_source": "livestock_source", + "is_inspected": "is_inspected", + "remark": "remark", + } + + for k, column in mapping.items(): + if k in data: + value = data[k] + # 如果字段是 "is_inspected",将其转换为布尔类型 + if k == "is_inspected": + value = bool(value) # 转换为布尔值 + fields.append(f"{column} = %s") + params.append(value) + + if fields: + sql = f"UPDATE sentinel_records SET {', '.join(fields)} WHERE id = %s" + params.append(id) + cursor.execute(sql, tuple(params)) + conn.commit() + return cursor.rowcount + + +def delete_sentinel_record_db(id: str) -> int: + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute("DELETE FROM sentinel_records WHERE id=%s;", (id,)) + conn.commit() + return cursor.rowcount diff --git a/bbit_ai/app/db/postgres/system.py b/bbit_ai/app/db/postgres/system.py index 5c1ec81..5901580 100644 --- a/bbit_ai/app/db/postgres/system.py +++ b/bbit_ai/app/db/postgres/system.py @@ -2,6 +2,7 @@ import json from uuid import UUID from config.pgDb import pg_pool +from utils import MyUtils from utils.MyUtils import format_datetime, is_valid_uuid @@ -387,7 +388,7 @@ def get_role_list_db_page(page: int, page_size: int, rid=None, name=None, remark "id": role_id, "name": name, "remark": remark, - "created_at": created_at, + "created_at": format_datetime(created_at), "permissions": menu_ids or [], } ) @@ -411,9 +412,9 @@ def get_user_list_db_page( params = [] # ---- 用户 ID,必须是 UUID,否则忽略 ---- - if uid and is_valid_uuid(uid): - conditions.append("u.id = %s") - params.append(uid) + if uid: + conditions.append("u.id::text LIKE %s") + params.append(f"%{uid}%") # ---- 用户名模糊搜索 ---- if username: @@ -506,19 +507,11 @@ def get_user_list_db_page( return result, total -# 检查用户名是否存在 -def db_user_name_exists(username: str, id: str | None = None) -> bool: +# 检查登录账号是否存在 +def db_user_name_exists(username: str) -> bool: with pg_pool.getConn() as conn: with conn.cursor() as cursor: - if id: - cursor.execute( - "SELECT COUNT(*) FROM users WHERE username = %s AND id <> %s;", - (username, id), - ) - else: - cursor.execute( - "SELECT COUNT(*) FROM users WHERE username = %s;", (username,) - ) + cursor.execute("SELECT COUNT(*) FROM users WHERE email = %s;", (username,)) return cursor.fetchone()[0] > 0 @@ -554,40 +547,6 @@ def insert_user(data: dict) -> str: return user_id -# 更新用户 -def update_user_db(id: str, data: dict) -> int: - with pg_pool.getConn() as conn: - with conn.cursor() as cursor: - cursor.execute( - """ - UPDATE users - SET username=%s, email=%s, phone=%s, is_active=%s, dept_id=%s - WHERE id=%s; - """, - ( - data.get("name"), - data.get("email"), - data.get("phone"), - bool(data.get("status", 1)), - data.get("dept_id"), - id, - ), - ) - - # 更新用户角色关系:先删除旧的,再插入新的 - cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,)) - role_ids = data.get("roles", []) - if role_ids: - for role_id in role_ids: - cursor.execute( - "INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s);", - (id, role_id), - ) - - conn.commit() - return cursor.rowcount - - # 局部更新用户(PATCH) def patch_user_db(id: str, data: dict) -> int: with pg_pool.getConn() as conn: @@ -603,6 +562,7 @@ def patch_user_db(id: str, data: dict) -> int: "name": "username", "email": "email", "phone": "phone", + "password_hash": "password_hash", "status": "is_active", "dept_id": "dept_id", } @@ -676,7 +636,7 @@ def get_menus_by_ids(menu_ids: list, plat_id: int): SELECT id, pid, name, path, component, redirect, auth_code, type, meta, created_at, updated_at FROM sys_menu - WHERE id = ANY(%s::varchar[]) AND plat_id = %s + WHERE id = ANY(%s::varchar[]) AND plat_id = %s AND type != 'button' ORDER BY created_at ASC; """ # 转换 uuid 列表为 str 列表 @@ -697,3 +657,296 @@ def build_menu_tree(items): else: tree.append(item) return tree + + +def get_dict_list(keyword="", page=1, page_size=10): + """ + 获取系统字典列表,支持分页和关键字搜索(key / name) + """ + offset = (page - 1) * page_size + + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + # 1️⃣ 查询总条数 + cursor.execute( + """ + SELECT COUNT(*) + FROM sys_dict + WHERE ( + %s = '' + OR key ILIKE '%%' || %s || '%%' + OR name ILIKE '%%' || %s || '%%' + ) + """, + (keyword, keyword, keyword), + ) + total = cursor.fetchone()[0] + + # 2️⃣ 查询当前页数据 + cursor.execute( + """ + SELECT id, key, name, remark, created_at + FROM sys_dict + WHERE ( + %s = '' + OR key ILIKE '%%' || %s || '%%' + OR name ILIKE '%%' || %s || '%%' + ) + ORDER BY created_at DESC + LIMIT %s OFFSET %s + """, + (keyword, keyword, keyword, page_size, offset), + ) + + rows = cursor.fetchall() + + result = [] + for row in rows: + result.append( + { + "id": row[0], + "key": row[1], + "name": row[2], + "remark": row[3], + "created_at": MyUtils.format_datetime(row[4]), + } + ) + + return total, result + + +def db_create_dict(key: str, name: str, remark: str): + """ + 在数据库中创建字典 + """ + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute( + """ + INSERT INTO sys_dict (id, key, name, remark, created_at) + VALUES (gen_random_uuid(), %s, %s, %s, now()) + RETURNING id + """, + (key, name, remark), + ) + new_id = cursor.fetchone()[0] + return new_id + + +def db_update_dict(id: str, key: str, name: str, remark: str): + """ + 在数据库中更新字典 + """ + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute( + """ + UPDATE sys_dict + SET key=%s, name=%s, remark=%s + WHERE id=%s + """, + (key, name, remark, id), + ) + return id + + +def db_delete_dict(id: str): + """ + 在数据库中删除字典 + """ + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute("DELETE FROM sys_dict WHERE id=%s", (id,)) + return id + + +def db_get_dict_detail(dict_id: str): + """ + 获取字典详情列表 + """ + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute( + """ + SELECT id, value, sort, pid, dict_id, remark, created_at, updated_at + FROM sys_dict_detail + WHERE dict_id=%s + ORDER BY sort ASC, created_at ASC + """, + (dict_id,), + ) + rows = cursor.fetchall() + result = [] + for row in rows: + result.append( + { + "id": row[0], + "value": row[1], + "sort": row[2], + "pid": row[3], + "dict_id": row[4], + "remark": row[5], + "created_at": row[6].isoformat() if row[6] else None, + "updated_at": row[7].isoformat() if row[7] else None, + } + ) + return result + + +def db_create_dict_detail( + value: str, dict_id: str, sort: int = 0, pid: str = None, remark: str = None +): + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute( + """ + INSERT INTO sys_dict_detail (id, value, dict_id, sort, pid, remark, created_at) + VALUES (gen_random_uuid(), %s, %s, %s, %s, %s, now()) + RETURNING id + """, + (value, dict_id, sort, pid, remark), + ) + new_id = cursor.fetchone()[0] + return new_id + + +def db_update_dict_detail( + id: str, value: str = None, sort: int = None, pid: str = None, remark: str = None +): + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute( + """ + UPDATE sys_dict_detail + SET value=COALESCE(%s, value), + sort=COALESCE(%s, sort), + pid=COALESCE(%s, pid), + remark=COALESCE(%s, remark), + updated_at=now() + WHERE id=%s + """, + (value, sort, pid, remark, id), + ) + return id + + +def db_delete_dict_detail(id: str): + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute("DELETE FROM sys_dict_detail WHERE id=%s", (id,)) + return id + + +def get_dict_detail_list_by_key(dict_key: str): + """ + 通过字典 key 获取字典明细列表 + """ + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute( + """ + SELECT + d.id, + d.value, + d.sort, + d.pid, + d.remark, + d.created_at + FROM sys_dict_detail d + JOIN sys_dict s ON d.dict_id = s.id + WHERE s.key = %s + ORDER BY d.sort ASC, d.created_at ASC + """, + (dict_key,), + ) + + rows = cursor.fetchall() + + result = [] + for row in rows: + result.append( + { + "id": row[0], + "value": row[1], + "sort": row[2], + "pid": row[3], + "remark": row[4], + "created_at": MyUtils.format_datetime(row[5]), + } + ) + + return result + + +def get_dept_ids_by_user_id(user_id: UUID) -> list: + # 第一步:通过 user_id 查找其所属的 dept_id + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT dept_id FROM users WHERE id = %s", (user_id,)) + dept_id = cursor.fetchone() + + if not dept_id: + raise ValueError( + f"User with id {user_id} not found or has no department." + ) + + dept_id = dept_id[0] + + # 第二步:通过 dept_id 获取所有下级部门的 dept_id + cursor.execute( + """ + WITH RECURSIVE dept_hierarchy AS ( + SELECT id FROM sys_dept WHERE id = %s + UNION + SELECT d.id FROM sys_dept d + INNER JOIN dept_hierarchy dh ON d.parent_id = dh.id + ) + SELECT id FROM dept_hierarchy; + """, + (dept_id,), + ) + dept_ids = [row[0] for row in cursor.fetchall()] + + return dept_ids + + +def get_dept_ids_by_user_id(user_id: UUID) -> list: + # 第一步:通过 user_id 查找其所属的 dept_id + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT dept_id FROM users WHERE id = %s", (user_id,)) + dept_id = cursor.fetchone() + + if not dept_id: + raise ValueError( + f"User with id {user_id} not found or has no department." + ) + + dept_id = dept_id[0] + + # 第二步:通过 dept_id 获取所有下级部门的 dept_id + cursor.execute( + """ + WITH RECURSIVE dept_hierarchy AS ( + SELECT id FROM sys_dept WHERE id = %s + UNION + SELECT d.id FROM sys_dept d + INNER JOIN dept_hierarchy dh ON d.parent_id = dh.id + ) + SELECT id FROM dept_hierarchy; + """, + (dept_id,), + ) + dept_ids = [row[0] for row in cursor.fetchall()] + + return dept_ids + + +def get_dept_id_by_user_id(user_id: UUID) -> list: + # 第一步:通过 user_id 查找其所属的 dept_id + with pg_pool.getConn() as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT dept_id FROM users WHERE id = %s", (user_id,)) + dept_id = cursor.fetchone() + dept_id = dept_id[0] + return dept_id diff --git a/bbit_ai/app/db/postgres/ws_manager.py b/bbit_ai/app/db/postgres/ws_manager.py new file mode 100644 index 0000000..9d5bc5c --- /dev/null +++ b/bbit_ai/app/db/postgres/ws_manager.py @@ -0,0 +1,25 @@ +import asyncio +from typing import List + +from fastapi import WebSocket + + +class ConnectionManager: + def __init__(self): + self.active_connections: List[WebSocket] = [] + self.lock = asyncio.Lock() + + async def connect(self, websocket: WebSocket): + await websocket.accept() + async with self.lock: + self.active_connections.append(websocket) + + async def disconnect(self, websocket: WebSocket): + async with self.lock: + if websocket in self.active_connections: + self.active_connections.remove(websocket) + + async def broadcast(self, message: dict): + async with self.lock: + for ws in self.active_connections: + await ws.send_json(message) diff --git a/bbit_ai/app/models/EMQXWebhook.py b/bbit_ai/app/models/EMQXWebhook.py new file mode 100644 index 0000000..dd61dde --- /dev/null +++ b/bbit_ai/app/models/EMQXWebhook.py @@ -0,0 +1,10 @@ +from typing import Optional + +from pydantic import BaseModel + + +class EMQXWebhook(BaseModel): + event: str + clientid: str + username: Optional[str] = None + timestamp: Optional[int] = None diff --git a/bbit_ai/app/requirements.txt b/bbit_ai/app/requirements.txt index 090e396..da159e8 100644 --- a/bbit_ai/app/requirements.txt +++ b/bbit_ai/app/requirements.txt @@ -22,7 +22,7 @@ Pillow==11.3.0 python-multipart==0.0.20 aio_pika==9.5.7 ultralytics==8.3.227 - +redis==7.1.0 # MCP服务 python-dotenv>=1.0.0 websockets>=11.0.3 diff --git a/bbit_ai/app/routers/Iot.py b/bbit_ai/app/routers/Iot.py new file mode 100644 index 0000000..e7d5142 --- /dev/null +++ b/bbit_ai/app/routers/Iot.py @@ -0,0 +1,123 @@ +from uuid import UUID + +from fastapi import APIRouter +from fastapi import Depends + +from config.redis import RedisClient +from db.postgres.iot import * +from models.BaseResponse import BaseResponse +from models.EMQXWebhook import EMQXWebhook +from routers.WS import ws_manager + +iot_router = APIRouter() +redis_client = RedisClient() + +from config.security import get_user_id_from_token + +# -------------------- 设备接口 -------------------- + + +@iot_router.post("/common/webhook") +async def emqx_webhook(data: EMQXWebhook): + device_id = data.clientid + event = data.event + + if event == "client.connected": + redis_client.set_online(device_id) + + await ws_manager.broadcast({"deviceId": device_id, "online": True}) + + print(f"[ONLINE] {device_id}") + + elif event == "client.disconnected": + redis_client.set_offline(device_id) + + await ws_manager.broadcast({"deviceId": device_id, "online": False}) + + print(f"[OFFLINE] {device_id}") + + else: + # 其他事件直接忽略 + print(f"[IGNORE] {event}") + + return {"ok": True} + + +@iot_router.get("/common/device/list") +async def get_device_list( + page: int = 1, + pageSize: int = 10, + id: str | None = None, + name: str | None = None, + status: int | None = None, + is_superuser: int | None = None, + dept_id: str | None = None, + startTime: str | None = None, + endTime: str | None = None, + user_id: UUID = Depends(get_user_id_from_token), +): + if not user_id: + return {"error": "userId is required"} + + devices, total = get_device_list_db_page( + page, pageSize, id, name, status, is_superuser, dept_id, startTime, endTime + ) + + # ===== 👇 核心:补在线状态 ===== + for d in devices: + device_id = d["name"] # 账号 + d["online"] = redis_client.is_device_online(device_id) == 1 + + return BaseResponse(data={"list": devices, "total": total}) + + +@iot_router.post("/common/device") +async def create_device(data: dict, user_id: UUID = Depends(get_user_id_from_token)): + if not user_id: + return {"error": "userId is required"} + new_id = insert_device(data) + return BaseResponse(data={"id": new_id}) + + +@iot_router.put("/common/device/{id}") +async def update_device( + id: str, + data: dict, + user_id: UUID = Depends(get_user_id_from_token), +): + if not user_id: + return {"error": "userId is required"} + + count = update_device_db(id, data) + if count == 0: + return BaseResponse(status=False, message="设备不存在", data=None) + return BaseResponse(data=True) + + +@iot_router.patch("/common/device/{id}") +async def patch_device( + id: str, + data: dict, + user_id: UUID = Depends(get_user_id_from_token), +): + if not user_id: + return {"error": "userId is required"} + + count = patch_device_db(id, data) + if count == 0: + return BaseResponse(status=False, message="设备不存在", data=None) + return BaseResponse(data=True) + + +@iot_router.delete("/common/device/{id}") +async def delete_device( + id: str, + user_id: UUID = Depends(get_user_id_from_token), +): + if not user_id: + return {"error": "userId is required"} + + deleted = delete_device_db(id) + if deleted == 0: + return BaseResponse(status=False, message="设备不存在", data=None) + return BaseResponse(data=True) diff --git a/bbit_ai/app/routers/Sentinel.py b/bbit_ai/app/routers/Sentinel.py new file mode 100644 index 0000000..5328d3d --- /dev/null +++ b/bbit_ai/app/routers/Sentinel.py @@ -0,0 +1,98 @@ +from uuid import UUID + +from fastapi import Depends, APIRouter + +from config.security import get_user_id_from_token +from db.postgres import get_dept_ids_by_user_id, get_dept_id_by_user_id +from db.postgres.sentinel import * +from models.BaseResponse import BaseResponse + +# -------------------- 设备接口 -------------------- + +sentinel_router = APIRouter() + + +@sentinel_router.get("/record/list") +async def get_sentinel_record_list( + page: int = 1, + page_size: int = 10, + id: str | None = None, + license_plate: str | None = None, + vehicle_type: str | None = None, + is_inspected: int | None = None, + livestock_type: str = None, + livestock_source: str | None = None, + start_time: str | None = None, + end_time: str | None = None, + user_id: UUID = Depends(get_user_id_from_token), +): + if not user_id: + return {"error": "userId is required"} + dept_ids = get_dept_ids_by_user_id(user_id) + print(dept_ids) # 输出所有部门的 dept_id 列表 + + records, total = get_sentinel_record_list_db_page( + page, + page_size, + id, + license_plate, + vehicle_type, + is_inspected, + livestock_source, + livestock_type, + dept_ids, + start_time, + end_time, + ) + + return BaseResponse(data={"list": records, "total": total}) + + +@sentinel_router.post("/record") +async def create_sentinel_record( + data: dict, user_id: UUID = Depends(get_user_id_from_token) +): + if not user_id: + return {"error": "userId is required"} + dept_id = get_dept_id_by_user_id(user_id) + new_id = insert_sentinel_record(data, dept_id) + return BaseResponse(data={"id": new_id}) + + +@sentinel_router.put("/record/{id}") +async def update_sentinel_record( + id: str, data: dict, user_id: UUID = Depends(get_user_id_from_token) +): + if not user_id: + return {"error": "userId is required"} + + count = update_sentinel_record_db(id, data) + if count == 0: + return BaseResponse(status=False, message="记录不存在", data=None) + return BaseResponse(data=True) + + +@sentinel_router.patch("/record/{id}") +async def patch_sentinel_record( + id: str, data: dict, user_id: UUID = Depends(get_user_id_from_token) +): + if not user_id: + return {"error": "userId is required"} + + count = patch_sentinel_record_db(id, data) + if count == 0: + return BaseResponse(status=False, message="记录不存在", data=None) + return BaseResponse(data=True) + + +@sentinel_router.delete("/record/{id}") +async def delete_sentinel_record( + id: str, user_id: UUID = Depends(get_user_id_from_token) +): + if not user_id: + return {"error": "userId is required"} + + deleted = delete_sentinel_record_db(id) + if deleted == 0: + return BaseResponse(status=False, message="记录不存在", data=None) + return BaseResponse(data=True) diff --git a/bbit_ai/app/routers/System.py b/bbit_ai/app/routers/System.py index 23a4e5b..6c5dc57 100644 --- a/bbit_ai/app/routers/System.py +++ b/bbit_ai/app/routers/System.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, Query from config.security import get_user_id_from_token from db.postgres.system import * @@ -60,7 +60,7 @@ async def dept_update( parent_id = data.get("pid") name = data.get("name") - comment = data.get("comment") + comment = data.get("remark") rowcount = update_dept(id, parent_id, name, comment) if rowcount == 0: @@ -294,12 +294,11 @@ async def delete_role( @systemRouter.get("/user/name-exists") async def user_name_exists( username: str, - id: str | None = None, user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} - exists = db_user_name_exists(username, id) + exists = db_user_name_exists(username) return BaseResponse(data=exists) @@ -333,21 +332,6 @@ async def create_user(data: dict, user_id: UUID = Depends(get_user_id_from_token return BaseResponse(data={"id": new_id}) -@systemRouter.put("/user/{id}") -async def update_user( - id: str, - data: dict, - user_id: UUID = Depends(get_user_id_from_token), -): - if not user_id: - return {"error": "userId is required"} - - count = update_user_db(id, data) - if count == 0: - return BaseResponse(status=False, message="用户不存在", data=None) - return BaseResponse(data=True) - - @systemRouter.patch("/user/{id}") async def patch_user( id: str, @@ -375,3 +359,118 @@ async def delete_user( if deleted == 0: return BaseResponse(status=False, message="用户不存在", data=None) return BaseResponse(data=True) + + +@systemRouter.get("/dict/list") +def getScVideoList( + user_id: UUID = Depends(get_user_id_from_token), + name: str = "", + page: int = Query(1, ge=1), + page_size: int = Query(10, ge=1, le=100), +): + if not user_id: + return {"error": "userId is required"} + total, items = get_dict_list(name, page=page, page_size=page_size) + return BaseResponse( + data={ + "total": total, + "items": items, + } + ) + + +@systemRouter.post("/dict") +def create_dict_api(data: dict, user_id: UUID = Depends(get_user_id_from_token)): + """ + 创建字典接口 + data = { "key": str, "name": str, "remark": str } + """ + if not user_id: + return {"error": "userId is required"} + + new_id = db_create_dict(data.get("key"), data.get("name"), data.get("remark")) + return BaseResponse(data={"id": new_id}) + + +@systemRouter.put("/dict/{id}") +def update_dict_api( + id: str, data: dict, user_id: UUID = Depends(get_user_id_from_token) +): + """ + 更新字典接口 + """ + if not user_id: + return {"error": "userId is required"} + + db_update_dict(id, data.get("key"), data.get("name"), data.get("remark")) + return BaseResponse(data={"id": id}) + + +@systemRouter.delete("/dict/{id}") +def delete_dict_api(id: str, user_id: UUID = Depends(get_user_id_from_token)): + """ + 删除字典接口 + """ + if not user_id: + return {"error": "userId is required"} + + db_delete_dict(id) + return BaseResponse(data={"id": id}) + + +@systemRouter.get("/dict/detail/list") +def get_dict_detail_api(dictId: str, user_id: UUID = Depends(get_user_id_from_token)): + if not user_id: + return {"error": "userId is required"} + items = db_get_dict_detail(dictId) + return BaseResponse(data={"list": items}) + + +@systemRouter.post("/dict/detail") +def create_dict_detail_api(data: dict, user_id: UUID = Depends(get_user_id_from_token)): + if not user_id: + return {"error": "userId is required"} + new_id = db_create_dict_detail( + value=data.get("value"), + dict_id=data.get("dict_id"), + sort=data.get("sort", 0), + pid=data.get("pid"), + remark=data.get("remark"), + ) + return BaseResponse(data={"id": new_id}) + + +@systemRouter.put("/dict/detail/{id}") +def update_dict_detail_api( + id: str, data: dict, user_id: UUID = Depends(get_user_id_from_token) +): + if not user_id: + return {"error": "userId is required"} + db_update_dict_detail( + id=id, + value=data.get("value"), + sort=data.get("sort"), + pid=data.get("pid"), + remark=data.get("remark"), + ) + return BaseResponse(data={"id": id}) + + +@systemRouter.delete("/dict/detail/{id}") +def delete_dict_detail_api(id: str, user_id: UUID = Depends(get_user_id_from_token)): + if not user_id: + return {"error": "userId is required"} + db_delete_dict_detail(id) + return BaseResponse(data={"id": id}) + + +@systemRouter.get("/dict/getValue") +def get_dict_detail_by_key( + key: str = Query(..., min_length=1), +): + if not key: + return {"error": "key is required"} + + items = get_dict_detail_list_by_key(key) + + return BaseResponse(data=items) diff --git a/bbit_ai/app/routers/WS.py b/bbit_ai/app/routers/WS.py new file mode 100644 index 0000000..e06cbdc --- /dev/null +++ b/bbit_ai/app/routers/WS.py @@ -0,0 +1,23 @@ +from fastapi import APIRouter +from starlette.websockets import WebSocket, WebSocketDisconnect + +from db.postgres.ws_manager import ConnectionManager + +ws_manager = ConnectionManager() + + +iot_ws_router = APIRouter() + + +@iot_ws_router.websocket("/device-status") +async def websocket_device_status(websocket: WebSocket): + await ws_manager.connect(websocket) + print("[WS] client connected") + + try: + while True: + # 这里不需要接收任何消息 + await websocket.receive_text() + except WebSocketDisconnect: + await ws_manager.disconnect(websocket) + print("[WS] client disconnected") diff --git a/bbit_ai/app_mcp/core/connection_manager.py b/bbit_ai/app_mcp/core/connection_manager.py index b82f139..0e8fe47 100644 --- a/bbit_ai/app_mcp/core/connection_manager.py +++ b/bbit_ai/app_mcp/core/connection_manager.py @@ -8,11 +8,10 @@ import json import time import uuid from typing import Dict, Any, List, Optional, Tuple -from websockets.server import WebSocketServerProtocol -from websockets.exceptions import ConnectionClosed -from utils.logger import get_logger -logger = get_logger() +from fastapi import logger +from websockets.exceptions import ConnectionClosed +from websockets.legacy.server import WebSocketServerProtocol class RobotConnection: diff --git a/ktor/src/main/kotlin/ink/snowflake/server/controller/User.kt b/ktor/src/main/kotlin/ink/snowflake/server/controller/User.kt index 344f381..c566e47 100644 --- a/ktor/src/main/kotlin/ink/snowflake/server/controller/User.kt +++ b/ktor/src/main/kotlin/ink/snowflake/server/controller/User.kt @@ -76,7 +76,7 @@ fun Application.User(config: AppConfig) { } else { // 账号密码不匹配 BaseResponse(status = false, message = "账号密码不匹配,请重新登录", data = null) - } + } } else { BaseResponse(status = false, message = "账号已被禁用,请联系管理员", data = null) }