Files
AILab/bbit_ai/app/db/postgres/system.py
T
2026-02-04 13:58:18 +08:00

1021 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
from uuid import UUID
from config.pgDb import pg_pool
from utils import MyUtils
from utils.MyUtils import format_datetime, is_valid_uuid
def get_all_depts():
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, parent_id, name, comment, created_at
FROM sys_dept
ORDER BY created_at ASC;
"""
)
rows = cursor.fetchall()
# cursor.description 可用于映射字段名
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def insert_dept(parent_id, name, comment):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sys_dept (parent_id, name, comment, created_at)
VALUES (%s, %s, %s, NOW())
RETURNING id;
""",
(parent_id, name, comment),
)
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
def update_dept(id, parent_id, name, comment):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sys_dept
SET parent_id = %s,
name = %s,
comment = %s
WHERE id = %s;
""",
(parent_id, name, comment, id),
)
conn.commit()
return cursor.rowcount # >0表示成功
def has_child_dept(id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_dept WHERE parent_id = %s;
""",
(id,),
)
return cursor.fetchone()[0] > 0
def delete_dept(id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("""DELETE FROM sys_dept WHERE id = %s;""", (id,))
conn.commit()
return cursor.rowcount
def get_menus(plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at
FROM sys_menu
WHERE plat_id = %s
ORDER BY created_at ASC;
""",
(plat_id,),
)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def get_all_menus():
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at
FROM sys_menu
ORDER BY created_at ASC;
""",
)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def insert_menu(data: dict, plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sys_menu
(pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at, plat_id)
VALUES (%s, %s, %s, %s, %s,
%s, %s, %s, NOW(), NOW(), %s)
RETURNING id;
""",
(
data.get("pid"),
data.get("name"),
data.get("path"),
data.get("component"),
data.get("redirect"),
data.get("auth_code"),
data.get("type"),
json.dumps(data.get("meta") or {}),
plat_id,
),
)
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
def update_menu_db(id: str, data: dict):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sys_menu
SET pid = %s,
name = %s,
path = %s,
component = %s,
redirect = %s,
auth_code = %s,
type = %s,
meta = %s,
updated_at = NOW()
WHERE id = %s;
""",
(
data.get("pid"),
data.get("name"),
data.get("path"),
data.get("component"),
data.get("redirect"),
data.get("auth_code"),
data.get("type"),
json.dumps(data.get("meta") or {}),
id,
),
)
conn.commit()
return cursor.rowcount
def db_menu_name_exists(name: str, id: str | None, plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu
WHERE name = %s AND id <> %s AND plat_id = %s;
""",
(name, id, plat_id),
)
else:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu WHERE name = %s;
""",
(name,),
)
return cursor.fetchone()[0] > 0
def db_menu_path_exists(path: str, id: str | None, plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu
WHERE path = %s AND id <> %s AND plat_id = %s;
""",
(path, id, plat_id),
)
else:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu WHERE path = %s;
""",
(path,),
)
return cursor.fetchone()[0] > 0
def menu_has_children(id: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu WHERE pid = %s;
""",
(id,),
)
return cursor.fetchone()[0] > 0
def delete_menu_db(id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("""DELETE FROM sys_menu WHERE id = %s;""", (id,))
conn.commit()
return cursor.rowcount
# 检查角色名是否存在
def db_role_name_exists(name: str, id: str | None = None) -> bool:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"SELECT COUNT(*) FROM sys_role WHERE name = %s AND id <> %s;",
(name, id),
)
else:
cursor.execute(
"SELECT COUNT(*) FROM sys_role WHERE name = %s;", (name,)
)
return cursor.fetchone()[0] > 0
# 插入角色
def insert_role(data: dict) -> str:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO sys_role (name, remark) VALUES (%s, %s) RETURNING id;",
(data.get("name"), data.get("remark")),
)
role_id = cursor.fetchone()[0]
# 插入角色对应菜单关系
menu_ids = data.get("permissions", [])
if menu_ids:
for menu_id in menu_ids:
cursor.execute(
"INSERT INTO sys_role_menu (role_id, menu_id) VALUES (%s, %s);",
(role_id, menu_id),
)
conn.commit()
return role_id
# 更新角色
def update_role_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"UPDATE sys_role SET name = %s, remark = %s WHERE id = %s;",
(data.get("name"), data.get("remark"), id),
)
# 更新角色菜单关系:先删除旧的,再插入新的
cursor.execute("DELETE FROM sys_role_menu WHERE role_id = %s;", (id,))
menu_ids = data.get("permissions", [])
if menu_ids:
for menu_id in menu_ids:
cursor.execute(
"INSERT INTO sys_role_menu (role_id, menu_id) VALUES (%s, %s);",
(id, menu_id),
)
conn.commit()
return cursor.rowcount
# 删除角色
def delete_role_db(id: str) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# 删除角色菜单关联
cursor.execute("DELETE FROM sys_role_menu WHERE role_id = %s;", (id,))
# 删除角色
cursor.execute("DELETE FROM sys_role WHERE id = %s;", (id,))
conn.commit()
return cursor.rowcount
# 查询角色列表
def get_role_list_db() -> list[dict]:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT id, name, remark, created_at FROM sys_role;")
roles = cursor.fetchall()
result = []
for r in roles:
role_id, name, remark, created_at = r
# 查询角色对应菜单
cursor.execute(
"SELECT menu_id FROM sys_role_menu WHERE role_id = %s;", (role_id,)
)
result.append(
{
"id": role_id,
"title": name,
}
)
return result
def get_role_list_db_page(page: int, page_size: int, rid=None, name=None, remark=None):
offset = (page - 1) * page_size
conditions = []
params = []
# ---- id(必须是 UUID,否则不作为条件)----
if rid and is_valid_uuid(rid):
conditions.append("r.id = %s")
params.append(rid)
# ---- name(模糊搜索)----
if name:
conditions.append("r.name LIKE %s")
params.append(f"%{name}%")
# ---- remark(模糊搜索)----
if remark:
conditions.append("r.remark LIKE %s")
params.append(f"%{remark}%")
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ---- 总数统计 ----
count_sql = f"SELECT COUNT(*) FROM sys_role r {where_clause};"
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# ---- 分页数据 ----
list_sql = f"""
SELECT
r.id,
r.name,
r.remark,
r.created_at,
array_remove(array_agg(rm.menu_id::varchar), NULL) AS menu_ids
FROM sys_role r
LEFT JOIN sys_role_menu rm ON r.id = rm.role_id
{where_clause}
GROUP BY r.id
ORDER BY r.created_at DESC
LIMIT %s OFFSET %s;
"""
cursor.execute(list_sql, params + [page_size, offset])
rows = cursor.fetchall()
roles = []
for r in rows:
role_id, name, remark, created_at, menu_ids = r
roles.append(
{
"id": role_id,
"name": name,
"remark": remark,
"created_at": format_datetime(created_at),
"permissions": menu_ids or [],
}
)
return roles, total
def get_user_list_db_page(
page: int,
page_size: int,
uid=None,
username=None,
status=None,
dept_id=None,
startTime=None,
endTime=None,
):
offset = (page - 1) * page_size
conditions = []
params = []
# ---- 用户 ID,必须是 UUID,否则忽略 ----
if uid:
conditions.append("u.id::text LIKE %s")
params.append(f"%{uid}%")
# ---- 用户名模糊搜索 ----
if username:
conditions.append("u.username LIKE %s")
params.append(f"%{username}%")
# ---- 状态 0/1 ----
if status is not None:
conditions.append("u.is_active = %s")
params.append(status == 1)
# ---- 部门 ID(也是 uuid----
if dept_id and is_valid_uuid(dept_id):
conditions.append("u.dept_id = %s")
params.append(dept_id)
# ---- 时间过滤 ----
if startTime:
conditions.append("u.created_at >= %s")
params.append(startTime)
if endTime:
conditions.append("u.created_at <= %s")
params.append(endTime)
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ---- 统计总数 ----
count_sql = f"SELECT COUNT(*) FROM sys_users u {where_clause};"
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# ---- 分页查询 ----
list_sql = f"""
SELECT
u.id,
u.username,
u.email,
u.phone,
u.is_active,
u.dept_id,
d.name AS dept_name,
u.created_at
FROM sys_users u
LEFT JOIN sys_dept d ON u.dept_id = d.id
{where_clause}
ORDER BY u.created_at DESC
LIMIT %s OFFSET %s;
"""
cursor.execute(list_sql, params + [page_size, offset])
rows = cursor.fetchall()
result = []
for r in rows:
(
user_id,
username,
email,
phone,
is_active,
d_id,
d_name,
created_at,
) = r
cursor.execute(
"SELECT role_id FROM sys_user_role WHERE user_id = %s;",
(user_id,),
)
role_ids = [rr[0] for rr in cursor.fetchall()]
result.append(
{
"id": user_id,
"name": username,
"email": email,
"phone": phone,
"status": 1 if is_active else 0,
"dept_id": d_id,
"dept_name": d_name,
"roles": role_ids,
"created_at": format_datetime(created_at),
}
)
return result, total
# 检查登录账号是否存在
def db_user_name_exists(username: str) -> bool:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT COUNT(*) FROM sys_users WHERE email = %s;", (username,)
)
return cursor.fetchone()[0] > 0
# 插入用户
def insert_user(data: dict) -> str:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sys_users (username, email, phone, is_active, dept_id, password_hash)
VALUES (%s, %s, %s, %s, %s, %s) RETURNING id;
""",
(
data.get("name"),
data.get("email"),
data.get("phone"),
bool(data.get("status", 1)),
data.get("dept_id"),
data.get("password_hash", ""),
),
)
user_id = cursor.fetchone()[0]
# 插入用户角色
role_ids = data.get("roles", [])
if role_ids:
for role_id in role_ids:
cursor.execute(
"INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s);",
(user_id, role_id),
)
conn.commit()
return user_id
# 局部更新用户(PATCH
def patch_user_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ------------------------
# 1. 构建动态 SQL
# ------------------------
fields = []
params = []
mapping = {
"name": "username",
"email": "email",
"phone": "phone",
"password_hash": "password_hash",
"status": "is_active",
"dept_id": "dept_id",
}
for k, column in mapping.items():
if k in data:
if k == "status":
fields.append(f"{column} = %s")
params.append(bool(data[k]))
else:
fields.append(f"{column} = %s")
params.append(data[k])
# 如果有需要更新的字段
if fields:
sql = f"UPDATE sys_users SET {', '.join(fields)} WHERE id = %s"
params.append(id)
cursor.execute(sql, tuple(params))
updated_count = cursor.rowcount # 保存这次的结果
# ------------------------
# 2. roles 单独处理
# ------------------------
if "roles" in data:
cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,))
roles = data.get("roles") or []
for role_id in roles:
cursor.execute(
"INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s)",
(id, role_id),
)
conn.commit()
return updated_count
# 删除用户
def delete_user_db(id: str) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,))
cursor.execute("DELETE FROM sys_users WHERE id=%s;", (id,))
conn.commit()
return cursor.rowcount
def get_role_ids_by_user(user_id: UUID):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT role_id FROM sys_user_role WHERE user_id = %s;", (str(user_id),)
)
rows = cursor.fetchall()
return [row[0] for row in rows]
def get_menu_ids_by_roles(role_ids: list):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
sql = f"SELECT DISTINCT menu_id FROM sys_role_menu WHERE role_id = ANY(%s);"
cursor.execute(sql, (role_ids,))
rows = cursor.fetchall()
return [row[0] for row in rows]
def get_menus_by_ids(menu_ids: list, plat_id: int):
if not menu_ids:
return []
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
sql = """
SELECT id, pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at
FROM sys_menu
WHERE id = ANY(%s::varchar[]) AND plat_id = %s AND type != 'button'
ORDER BY created_at ASC;
"""
# 转换 uuid 列表为 str 列表
menu_ids_str = [str(mid) for mid in menu_ids]
cursor.execute(sql, (menu_ids_str, plat_id))
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def build_menu_tree(items):
item_map = {item["id"]: item for item in items}
tree = []
for item in items:
pid = item["pid"]
if pid and pid in item_map:
item_map[pid]["children"].append(item)
else:
tree.append(item)
return tree
def get_dict_list(keyword="", page=1, page_size=10):
"""
获取系统字典列表,支持分页和关键字搜索(key / name)
"""
offset = (page - 1) * page_size
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# 1️⃣ 查询总条数
cursor.execute(
"""
SELECT COUNT(*)
FROM sys_dict
WHERE (
%s = ''
OR key ILIKE '%%' || %s || '%%'
OR name ILIKE '%%' || %s || '%%'
)
""",
(keyword, keyword, keyword),
)
total = cursor.fetchone()[0]
# 2️⃣ 查询当前页数据
cursor.execute(
"""
SELECT id, key, name, remark, created_at
FROM sys_dict
WHERE (
%s = ''
OR key ILIKE '%%' || %s || '%%'
OR name ILIKE '%%' || %s || '%%'
)
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(keyword, keyword, keyword, page_size, offset),
)
rows = cursor.fetchall()
result = []
for row in rows:
result.append(
{
"id": row[0],
"key": row[1],
"name": row[2],
"remark": row[3],
"created_at": MyUtils.format_datetime(row[4]),
}
)
return total, result
def db_create_dict(key: str, name: str, remark: str):
"""
在数据库中创建字典
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sys_dict (id, key, name, remark, created_at)
VALUES (gen_random_uuid(), %s, %s, %s, now())
RETURNING id
""",
(key, name, remark),
)
new_id = cursor.fetchone()[0]
return new_id
def db_update_dict(id: str, key: str, name: str, remark: str):
"""
在数据库中更新字典
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sys_dict
SET key=%s, name=%s, remark=%s
WHERE id=%s
""",
(key, name, remark, id),
)
return id
def db_delete_dict(id: str):
"""
在数据库中删除字典
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM sys_dict WHERE id=%s", (id,))
return id
def db_get_dict_detail(dict_id: str):
"""
获取字典详情列表
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, value, sort, pid, dict_id, remark, created_at, updated_at
FROM sys_dict_detail
WHERE dict_id=%s
ORDER BY sort ASC, created_at ASC
""",
(dict_id,),
)
rows = cursor.fetchall()
result = []
for row in rows:
result.append(
{
"id": row[0],
"value": row[1],
"sort": row[2],
"pid": row[3],
"dict_id": row[4],
"remark": row[5],
"created_at": row[6].isoformat() if row[6] else None,
"updated_at": row[7].isoformat() if row[7] else None,
}
)
return result
def db_create_dict_detail(
value: str, dict_id: str, sort: int = 0, pid: str = None, remark: str = None
):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sys_dict_detail (id, value, dict_id, sort, pid, remark, created_at)
VALUES (gen_random_uuid(), %s, %s, %s, %s, %s, now())
RETURNING id
""",
(value, dict_id, sort, pid, remark),
)
new_id = cursor.fetchone()[0]
return new_id
def db_update_dict_detail(
id: str, value: str = None, sort: int = None, pid: str = None, remark: str = None
):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sys_dict_detail
SET value=COALESCE(%s, value),
sort=COALESCE(%s, sort),
pid=COALESCE(%s, pid),
remark=COALESCE(%s, remark),
updated_at=now()
WHERE id=%s
""",
(value, sort, pid, remark, id),
)
return id
def db_delete_dict_detail(id: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM sys_dict_detail WHERE id=%s", (id,))
return id
def get_dict_detail_list_by_key(dict_key: str):
"""
通过字典 key 获取字典明细列表
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT
d.id,
d.value,
d.sort,
d.pid,
d.remark,
d.created_at
FROM sys_dict_detail d
JOIN sys_dict s ON d.dict_id = s.id
WHERE s.key = %s
ORDER BY d.sort ASC, d.created_at ASC
""",
(dict_key,),
)
rows = cursor.fetchall()
result = []
for row in rows:
result.append(
{
"id": row[0],
"value": row[1],
"sort": row[2],
"pid": row[3],
"remark": row[4],
"created_at": MyUtils.format_datetime(row[5]),
}
)
return result
def get_dept_ids_by_user_id(user_id: UUID) -> list:
# 第一步:通过 user_id 查找其所属的 dept_id
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT dept_id FROM sys_users WHERE id = %s", (user_id,))
dept_id = cursor.fetchone()
if not dept_id:
raise ValueError(
f"User with id {user_id} not found or has no department."
)
dept_id = dept_id[0]
# 第二步:通过 dept_id 获取所有下级部门的 dept_id
cursor.execute(
"""
WITH RECURSIVE dept_hierarchy AS (
SELECT id FROM sys_dept WHERE id = %s
UNION
SELECT d.id FROM sys_dept d
INNER JOIN dept_hierarchy dh ON d.parent_id = dh.id
)
SELECT id FROM dept_hierarchy;
""",
(dept_id,),
)
dept_ids = [row[0] for row in cursor.fetchall()]
return dept_ids
def get_dept_ids_by_user_id(user_id: UUID) -> list:
# 第一步:通过 user_id 查找其所属的 dept_id
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT dept_id FROM sys_users WHERE id = %s", (user_id,))
dept_id = cursor.fetchone()
if not dept_id:
raise ValueError(
f"User with id {user_id} not found or has no department."
)
dept_id = dept_id[0]
# 第二步:通过 dept_id 获取所有下级部门的 dept_id
cursor.execute(
"""
WITH RECURSIVE dept_hierarchy AS (
SELECT id FROM sys_dept WHERE id = %s
UNION
SELECT d.id FROM sys_dept d
INNER JOIN dept_hierarchy dh ON d.parent_id = dh.id
)
SELECT id FROM dept_hierarchy;
""",
(dept_id,),
)
dept_ids = [row[0] for row in cursor.fetchall()]
return dept_ids
def get_dept_id_by_user_id(user_id: str) -> str:
# 通过 user_id 查找其所属的 dept_id
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT dept_id FROM sys_users WHERE id = %s", (user_id,))
dept_id = cursor.fetchone()
dept_id = dept_id[0]
return str(dept_id)
def get_dept_name_by_dept_id(user_id: str) -> str:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT name FROM sys_dept WHERE id = %s", (user_id,))
dept_id = cursor.fetchone()
dept_id = dept_id[0]
return str(dept_id)
def get_dept_id_by_iot_user_name(user_id: str) -> str:
# 通过 iot_user_id 查找其所属的 dept_id
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT dept_id FROM iot_users WHERE name = %s", (user_id,))
row = cursor.fetchone()
if row is None:
return None # 或者抛出自定义异常
return str(row[0])
def get_device_type_by_iot_user_name(user_id: str) -> str:
# 通过 iot_user_id 查找其所属的 type
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT type FROM iot_users WHERE name = %s", (user_id,))
row = cursor.fetchone()
if row is None:
return None # 或者抛出自定义异常
return str(row[0])
from typing import List
def get_dept_ids_by_dept_id(dept_id: str) -> List[str]:
"""
获取当前部门 ID 以及其所有父部门 ID(递归向上)
返回顺序:从当前部门一直到最顶层父部门
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
WITH RECURSIVE dept_tree AS (
-- 起点:当前部门
SELECT id, parent_id
FROM sys_dept
WHERE id = %s
UNION ALL
-- 向上递归找父部门
SELECT d.id, d.parent_id
FROM sys_dept d
INNER JOIN dept_tree dt ON d.id = dt.parent_id
)
SELECT id FROM dept_tree;
""",
(dept_id,),
)
rows = cursor.fetchall()
return [str(row[0]) for row in rows]