import json from uuid import UUID from config.pgDb import pg_pool from utils import MyUtils from utils.MyUtils import format_datetime, is_valid_uuid def get_all_depts(): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT id, parent_id, name, comment, created_at FROM sys_dept ORDER BY created_at ASC; """ ) rows = cursor.fetchall() # cursor.description 可用于映射字段名 columns = [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in rows] def insert_dept(parent_id, name, comment): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO sys_dept (parent_id, name, comment, created_at) VALUES (%s, %s, %s, NOW()) RETURNING id; """, (parent_id, name, comment), ) new_id = cursor.fetchone()[0] conn.commit() return new_id def update_dept(id, parent_id, name, comment): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ UPDATE sys_dept SET parent_id = %s, name = %s, comment = %s WHERE id = %s; """, (parent_id, name, comment, id), ) conn.commit() return cursor.rowcount # >0表示成功 def has_child_dept(id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT COUNT(*) FROM sys_dept WHERE parent_id = %s; """, (id,), ) return cursor.fetchone()[0] > 0 def delete_dept(id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("""DELETE FROM sys_dept WHERE id = %s;""", (id,)) conn.commit() return cursor.rowcount def get_menus(plat_id: int): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT id, pid, name, path, component, redirect, auth_code, type, meta, created_at, updated_at FROM sys_menu WHERE plat_id = %s ORDER BY created_at ASC; """, (plat_id,), ) rows = cursor.fetchall() columns = [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in rows] def get_all_menus(): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT id, pid, name, path, component, redirect, auth_code, type, meta, created_at, updated_at FROM sys_menu ORDER BY created_at ASC; """, ) rows = cursor.fetchall() columns = [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in rows] def insert_menu(data: dict, plat_id: int): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO sys_menu (pid, name, path, component, redirect, auth_code, type, meta, created_at, updated_at, plat_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), %s) RETURNING id; """, ( data.get("pid"), data.get("name"), data.get("path"), data.get("component"), data.get("redirect"), data.get("auth_code"), data.get("type"), json.dumps(data.get("meta") or {}), plat_id, ), ) new_id = cursor.fetchone()[0] conn.commit() return new_id def update_menu_db(id: str, data: dict): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ UPDATE sys_menu SET pid = %s, name = %s, path = %s, component = %s, redirect = %s, auth_code = %s, type = %s, meta = %s, updated_at = NOW() WHERE id = %s; """, ( data.get("pid"), data.get("name"), data.get("path"), data.get("component"), data.get("redirect"), data.get("auth_code"), data.get("type"), json.dumps(data.get("meta") or {}), id, ), ) conn.commit() return cursor.rowcount def db_menu_name_exists(name: str, id: str | None, plat_id: int): with pg_pool.getConn() as conn: with conn.cursor() as cursor: if id: cursor.execute( """ SELECT COUNT(*) FROM sys_menu WHERE name = %s AND id <> %s AND plat_id = %s; """, (name, id, plat_id), ) else: cursor.execute( """ SELECT COUNT(*) FROM sys_menu WHERE name = %s; """, (name,), ) return cursor.fetchone()[0] > 0 def db_menu_path_exists(path: str, id: str | None, plat_id: int): with pg_pool.getConn() as conn: with conn.cursor() as cursor: if id: cursor.execute( """ SELECT COUNT(*) FROM sys_menu WHERE path = %s AND id <> %s AND plat_id = %s; """, (path, id, plat_id), ) else: cursor.execute( """ SELECT COUNT(*) FROM sys_menu WHERE path = %s; """, (path,), ) return cursor.fetchone()[0] > 0 def menu_has_children(id: str): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT COUNT(*) FROM sys_menu WHERE pid = %s; """, (id,), ) return cursor.fetchone()[0] > 0 def delete_menu_db(id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("""DELETE FROM sys_menu WHERE id = %s;""", (id,)) conn.commit() return cursor.rowcount # 检查角色名是否存在 def db_role_name_exists(name: str, id: str | None = None) -> bool: with pg_pool.getConn() as conn: with conn.cursor() as cursor: if id: cursor.execute( "SELECT COUNT(*) FROM sys_role WHERE name = %s AND id <> %s;", (name, id), ) else: cursor.execute( "SELECT COUNT(*) FROM sys_role WHERE name = %s;", (name,) ) return cursor.fetchone()[0] > 0 # 插入角色 def insert_role(data: dict) -> str: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( "INSERT INTO sys_role (name, remark) VALUES (%s, %s) RETURNING id;", (data.get("name"), data.get("remark")), ) role_id = cursor.fetchone()[0] # 插入角色对应菜单关系 menu_ids = data.get("permissions", []) if menu_ids: for menu_id in menu_ids: cursor.execute( "INSERT INTO sys_role_menu (role_id, menu_id) VALUES (%s, %s);", (role_id, menu_id), ) conn.commit() return role_id # 更新角色 def update_role_db(id: str, data: dict) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( "UPDATE sys_role SET name = %s, remark = %s WHERE id = %s;", (data.get("name"), data.get("remark"), id), ) # 更新角色菜单关系:先删除旧的,再插入新的 cursor.execute("DELETE FROM sys_role_menu WHERE role_id = %s;", (id,)) menu_ids = data.get("permissions", []) if menu_ids: for menu_id in menu_ids: cursor.execute( "INSERT INTO sys_role_menu (role_id, menu_id) VALUES (%s, %s);", (id, menu_id), ) conn.commit() return cursor.rowcount # 删除角色 def delete_role_db(id: str) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: # 删除角色菜单关联 cursor.execute("DELETE FROM sys_role_menu WHERE role_id = %s;", (id,)) # 删除角色 cursor.execute("DELETE FROM sys_role WHERE id = %s;", (id,)) conn.commit() return cursor.rowcount # 查询角色列表 def get_role_list_db() -> list[dict]: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("SELECT id, name, remark, created_at FROM sys_role;") roles = cursor.fetchall() result = [] for r in roles: role_id, name, remark, created_at = r # 查询角色对应菜单 cursor.execute( "SELECT menu_id FROM sys_role_menu WHERE role_id = %s;", (role_id,) ) result.append( { "id": role_id, "title": name, } ) return result def get_role_list_db_page(page: int, page_size: int, rid=None, name=None, remark=None): offset = (page - 1) * page_size conditions = [] params = [] # ---- id(必须是 UUID,否则不作为条件)---- if rid and is_valid_uuid(rid): conditions.append("r.id = %s") params.append(rid) # ---- name(模糊搜索)---- if name: conditions.append("r.name LIKE %s") params.append(f"%{name}%") # ---- remark(模糊搜索)---- if remark: conditions.append("r.remark LIKE %s") params.append(f"%{remark}%") where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" with pg_pool.getConn() as conn: with conn.cursor() as cursor: # ---- 总数统计 ---- count_sql = f"SELECT COUNT(*) FROM sys_role r {where_clause};" cursor.execute(count_sql, params) total = cursor.fetchone()[0] # ---- 分页数据 ---- list_sql = f""" SELECT r.id, r.name, r.remark, r.created_at, array_remove(array_agg(rm.menu_id::varchar), NULL) AS menu_ids FROM sys_role r LEFT JOIN sys_role_menu rm ON r.id = rm.role_id {where_clause} GROUP BY r.id ORDER BY r.created_at DESC LIMIT %s OFFSET %s; """ cursor.execute(list_sql, params + [page_size, offset]) rows = cursor.fetchall() roles = [] for r in rows: role_id, name, remark, created_at, menu_ids = r roles.append( { "id": role_id, "name": name, "remark": remark, "created_at": format_datetime(created_at), "permissions": menu_ids or [], } ) return roles, total def get_user_list_db_page( page: int, page_size: int, uid=None, username=None, status=None, dept_id=None, startTime=None, endTime=None, ): offset = (page - 1) * page_size conditions = [] params = [] # ---- 用户 ID,必须是 UUID,否则忽略 ---- if uid: conditions.append("u.id::text LIKE %s") params.append(f"%{uid}%") # ---- 用户名模糊搜索 ---- if username: conditions.append("u.username LIKE %s") params.append(f"%{username}%") # ---- 状态 0/1 ---- if status is not None: conditions.append("u.is_active = %s") params.append(status == 1) # ---- 部门 ID(也是 uuid)---- if dept_id and is_valid_uuid(dept_id): conditions.append("u.dept_id = %s") params.append(dept_id) # ---- 时间过滤 ---- if startTime: conditions.append("u.created_at >= %s") params.append(startTime) if endTime: conditions.append("u.created_at <= %s") params.append(endTime) where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" with pg_pool.getConn() as conn: with conn.cursor() as cursor: # ---- 统计总数 ---- count_sql = f"SELECT COUNT(*) FROM users u {where_clause};" cursor.execute(count_sql, params) total = cursor.fetchone()[0] # ---- 分页查询 ---- list_sql = f""" SELECT u.id, u.username, u.email, u.phone, u.is_active, u.dept_id, d.name AS dept_name, u.created_at FROM users u LEFT JOIN sys_dept d ON u.dept_id = d.id {where_clause} ORDER BY u.created_at DESC LIMIT %s OFFSET %s; """ cursor.execute(list_sql, params + [page_size, offset]) rows = cursor.fetchall() result = [] for r in rows: ( user_id, username, email, phone, is_active, d_id, d_name, created_at, ) = r cursor.execute( "SELECT role_id FROM sys_user_role WHERE user_id = %s;", (user_id,), ) role_ids = [rr[0] for rr in cursor.fetchall()] result.append( { "id": user_id, "name": username, "email": email, "phone": phone, "status": 1 if is_active else 0, "dept_id": d_id, "dept_name": d_name, "roles": role_ids, "created_at": format_datetime(created_at), } ) return result, total # 检查登录账号是否存在 def db_user_name_exists(username: str) -> bool: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM users WHERE email = %s;", (username,)) return cursor.fetchone()[0] > 0 # 插入用户 def insert_user(data: dict) -> str: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO users (username, email, phone, is_active, dept_id, password_hash) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id; """, ( data.get("name"), data.get("email"), data.get("phone"), bool(data.get("status", 1)), data.get("dept_id"), data.get("password_hash", ""), ), ) user_id = cursor.fetchone()[0] # 插入用户角色 role_ids = data.get("roles", []) if role_ids: for role_id in role_ids: cursor.execute( "INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s);", (user_id, role_id), ) conn.commit() return user_id # 局部更新用户(PATCH) def patch_user_db(id: str, data: dict) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: # ------------------------ # 1. 构建动态 SQL # ------------------------ fields = [] params = [] mapping = { "name": "username", "email": "email", "phone": "phone", "password_hash": "password_hash", "status": "is_active", "dept_id": "dept_id", } for k, column in mapping.items(): if k in data: if k == "status": fields.append(f"{column} = %s") params.append(bool(data[k])) else: fields.append(f"{column} = %s") params.append(data[k]) # 如果有需要更新的字段 if fields: sql = f"UPDATE users SET {', '.join(fields)} WHERE id = %s" params.append(id) cursor.execute(sql, tuple(params)) # ------------------------ # 2. roles 单独处理 # ------------------------ if "roles" in data: cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,)) roles = data.get("roles") or [] for role_id in roles: cursor.execute( "INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s)", (id, role_id), ) conn.commit() return cursor.rowcount # 删除用户 def delete_user_db(id: str) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,)) cursor.execute("DELETE FROM users WHERE id=%s;", (id,)) conn.commit() return cursor.rowcount def get_role_ids_by_user(user_id: UUID): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( "SELECT role_id FROM sys_user_role WHERE user_id = %s;", (str(user_id),) ) rows = cursor.fetchall() return [row[0] for row in rows] def get_menu_ids_by_roles(role_ids: list): with pg_pool.getConn() as conn: with conn.cursor() as cursor: sql = f"SELECT DISTINCT menu_id FROM sys_role_menu WHERE role_id = ANY(%s);" cursor.execute(sql, (role_ids,)) rows = cursor.fetchall() return [row[0] for row in rows] def get_menus_by_ids(menu_ids: list, plat_id: int): if not menu_ids: return [] with pg_pool.getConn() as conn: with conn.cursor() as cursor: sql = """ SELECT id, pid, name, path, component, redirect, auth_code, type, meta, created_at, updated_at FROM sys_menu WHERE id = ANY(%s::varchar[]) AND plat_id = %s AND type != 'button' ORDER BY created_at ASC; """ # 转换 uuid 列表为 str 列表 menu_ids_str = [str(mid) for mid in menu_ids] cursor.execute(sql, (menu_ids_str, plat_id)) rows = cursor.fetchall() columns = [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in rows] def build_menu_tree(items): item_map = {item["id"]: item for item in items} tree = [] for item in items: pid = item["pid"] if pid and pid in item_map: item_map[pid]["children"].append(item) else: tree.append(item) return tree def get_dict_list(keyword="", page=1, page_size=10): """ 获取系统字典列表,支持分页和关键字搜索(key / name) """ offset = (page - 1) * page_size with pg_pool.getConn() as conn: with conn.cursor() as cursor: # 1️⃣ 查询总条数 cursor.execute( """ SELECT COUNT(*) FROM sys_dict WHERE ( %s = '' OR key ILIKE '%%' || %s || '%%' OR name ILIKE '%%' || %s || '%%' ) """, (keyword, keyword, keyword), ) total = cursor.fetchone()[0] # 2️⃣ 查询当前页数据 cursor.execute( """ SELECT id, key, name, remark, created_at FROM sys_dict WHERE ( %s = '' OR key ILIKE '%%' || %s || '%%' OR name ILIKE '%%' || %s || '%%' ) ORDER BY created_at DESC LIMIT %s OFFSET %s """, (keyword, keyword, keyword, page_size, offset), ) rows = cursor.fetchall() result = [] for row in rows: result.append( { "id": row[0], "key": row[1], "name": row[2], "remark": row[3], "created_at": MyUtils.format_datetime(row[4]), } ) return total, result def db_create_dict(key: str, name: str, remark: str): """ 在数据库中创建字典 """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO sys_dict (id, key, name, remark, created_at) VALUES (gen_random_uuid(), %s, %s, %s, now()) RETURNING id """, (key, name, remark), ) new_id = cursor.fetchone()[0] return new_id def db_update_dict(id: str, key: str, name: str, remark: str): """ 在数据库中更新字典 """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ UPDATE sys_dict SET key=%s, name=%s, remark=%s WHERE id=%s """, (key, name, remark, id), ) return id def db_delete_dict(id: str): """ 在数据库中删除字典 """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("DELETE FROM sys_dict WHERE id=%s", (id,)) return id def db_get_dict_detail(dict_id: str): """ 获取字典详情列表 """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT id, value, sort, pid, dict_id, remark, created_at, updated_at FROM sys_dict_detail WHERE dict_id=%s ORDER BY sort ASC, created_at ASC """, (dict_id,), ) rows = cursor.fetchall() result = [] for row in rows: result.append( { "id": row[0], "value": row[1], "sort": row[2], "pid": row[3], "dict_id": row[4], "remark": row[5], "created_at": row[6].isoformat() if row[6] else None, "updated_at": row[7].isoformat() if row[7] else None, } ) return result def db_create_dict_detail( value: str, dict_id: str, sort: int = 0, pid: str = None, remark: str = None ): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO sys_dict_detail (id, value, dict_id, sort, pid, remark, created_at) VALUES (gen_random_uuid(), %s, %s, %s, %s, %s, now()) RETURNING id """, (value, dict_id, sort, pid, remark), ) new_id = cursor.fetchone()[0] return new_id def db_update_dict_detail( id: str, value: str = None, sort: int = None, pid: str = None, remark: str = None ): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ UPDATE sys_dict_detail SET value=COALESCE(%s, value), sort=COALESCE(%s, sort), pid=COALESCE(%s, pid), remark=COALESCE(%s, remark), updated_at=now() WHERE id=%s """, (value, sort, pid, remark, id), ) return id def db_delete_dict_detail(id: str): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("DELETE FROM sys_dict_detail WHERE id=%s", (id,)) return id def get_dict_detail_list_by_key(dict_key: str): """ 通过字典 key 获取字典明细列表 """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT d.id, d.value, d.sort, d.pid, d.remark, d.created_at FROM sys_dict_detail d JOIN sys_dict s ON d.dict_id = s.id WHERE s.key = %s ORDER BY d.sort ASC, d.created_at ASC """, (dict_key,), ) rows = cursor.fetchall() result = [] for row in rows: result.append( { "id": row[0], "value": row[1], "sort": row[2], "pid": row[3], "remark": row[4], "created_at": MyUtils.format_datetime(row[5]), } ) return result def get_dept_ids_by_user_id(user_id: UUID) -> list: # 第一步:通过 user_id 查找其所属的 dept_id with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("SELECT dept_id FROM users WHERE id = %s", (user_id,)) dept_id = cursor.fetchone() if not dept_id: raise ValueError( f"User with id {user_id} not found or has no department." ) dept_id = dept_id[0] # 第二步:通过 dept_id 获取所有下级部门的 dept_id cursor.execute( """ WITH RECURSIVE dept_hierarchy AS ( SELECT id FROM sys_dept WHERE id = %s UNION SELECT d.id FROM sys_dept d INNER JOIN dept_hierarchy dh ON d.parent_id = dh.id ) SELECT id FROM dept_hierarchy; """, (dept_id,), ) dept_ids = [row[0] for row in cursor.fetchall()] return dept_ids def get_dept_ids_by_user_id(user_id: UUID) -> list: # 第一步:通过 user_id 查找其所属的 dept_id with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("SELECT dept_id FROM users WHERE id = %s", (user_id,)) dept_id = cursor.fetchone() if not dept_id: raise ValueError( f"User with id {user_id} not found or has no department." ) dept_id = dept_id[0] # 第二步:通过 dept_id 获取所有下级部门的 dept_id cursor.execute( """ WITH RECURSIVE dept_hierarchy AS ( SELECT id FROM sys_dept WHERE id = %s UNION SELECT d.id FROM sys_dept d INNER JOIN dept_hierarchy dh ON d.parent_id = dh.id ) SELECT id FROM dept_hierarchy; """, (dept_id,), ) dept_ids = [row[0] for row in cursor.fetchall()] return dept_ids def get_dept_id_by_user_id(user_id: str) -> str: # 通过 user_id 查找其所属的 dept_id with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("SELECT dept_id FROM users WHERE id = %s", (user_id,)) dept_id = cursor.fetchone() dept_id = dept_id[0] return str(dept_id) def get_dept_id_by_iot_user_name(user_id: str) -> str: # 通过 iot_user_id 查找其所属的 dept_id with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("SELECT dept_id FROM iot_users WHERE name = %s", (user_id,)) dept_id = cursor.fetchone() dept_id = str(dept_id[0]) return dept_id def get_device_type_by_iot_user_name(user_id: str) -> str: # 通过 iot_user_id 查找其所属的 type with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("SELECT type FROM iot_users WHERE name = %s", (user_id,)) type = cursor.fetchone() type = str(type[0]) return type from typing import List def get_dept_ids_by_dept_id(dept_id: str) -> List[str]: """ 获取当前部门 ID 以及其所有父部门 ID(递归向上) 返回顺序:从当前部门一直到最顶层父部门 """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ WITH RECURSIVE dept_tree AS ( -- 起点:当前部门 SELECT id, parent_id FROM sys_dept WHERE id = %s UNION ALL -- 向上递归找父部门 SELECT d.id, d.parent_id FROM sys_dept d INNER JOIN dept_tree dt ON d.id = dt.parent_id ) SELECT id FROM dept_tree; """, (dept_id,), ) rows = cursor.fetchall() return [str(row[0]) for row in rows]