Files
AILab/bbit_ai/app/db/postgres/system.py
T
2025-12-08 18:11:48 +08:00

700 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
from uuid import UUID
from config.pgDb import pg_pool
from utils.MyUtils import format_datetime, is_valid_uuid
def get_all_depts():
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, parent_id, name, comment, created_at
FROM sys_dept
ORDER BY created_at ASC;
"""
)
rows = cursor.fetchall()
# cursor.description 可用于映射字段名
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def insert_dept(parent_id, name, comment):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sys_dept (parent_id, name, comment, created_at)
VALUES (%s, %s, %s, NOW())
RETURNING id;
""",
(parent_id, name, comment),
)
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
def update_dept(id, parent_id, name, comment):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sys_dept
SET parent_id = %s,
name = %s,
comment = %s
WHERE id = %s;
""",
(parent_id, name, comment, id),
)
conn.commit()
return cursor.rowcount # >0表示成功
def has_child_dept(id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_dept WHERE parent_id = %s;
""",
(id,),
)
return cursor.fetchone()[0] > 0
def delete_dept(id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("""DELETE FROM sys_dept WHERE id = %s;""", (id,))
conn.commit()
return cursor.rowcount
def get_menus(plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at
FROM sys_menu
WHERE plat_id = %s
ORDER BY created_at ASC;
""",
(plat_id,),
)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def get_all_menus():
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at
FROM sys_menu
ORDER BY created_at ASC;
""",
)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def insert_menu(data: dict, plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sys_menu
(pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at, plat_id)
VALUES (%s, %s, %s, %s, %s,
%s, %s, %s, NOW(), NOW(), %s)
RETURNING id;
""",
(
data.get("pid"),
data.get("name"),
data.get("path"),
data.get("component"),
data.get("redirect"),
data.get("auth_code"),
data.get("type"),
json.dumps(data.get("meta") or {}),
plat_id,
),
)
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
def update_menu_db(id: str, data: dict):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sys_menu
SET pid = %s,
name = %s,
path = %s,
component = %s,
redirect = %s,
auth_code = %s,
type = %s,
meta = %s,
updated_at = NOW()
WHERE id = %s;
""",
(
data.get("pid"),
data.get("name"),
data.get("path"),
data.get("component"),
data.get("redirect"),
data.get("auth_code"),
data.get("type"),
json.dumps(data.get("meta") or {}),
id,
),
)
conn.commit()
return cursor.rowcount
def db_menu_name_exists(name: str, id: str | None, plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu
WHERE name = %s AND id <> %s AND plat_id = %s;
""",
(name, id, plat_id),
)
else:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu WHERE name = %s;
""",
(name,),
)
return cursor.fetchone()[0] > 0
def db_menu_path_exists(path: str, id: str | None, plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu
WHERE path = %s AND id <> %s AND plat_id = %s;
""",
(path, id, plat_id),
)
else:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu WHERE path = %s;
""",
(path,),
)
return cursor.fetchone()[0] > 0
def menu_has_children(id: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu WHERE pid = %s;
""",
(id,),
)
return cursor.fetchone()[0] > 0
def delete_menu_db(id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("""DELETE FROM sys_menu WHERE id = %s;""", (id,))
conn.commit()
return cursor.rowcount
# 检查角色名是否存在
def db_role_name_exists(name: str, id: str | None = None) -> bool:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"SELECT COUNT(*) FROM sys_role WHERE name = %s AND id <> %s;",
(name, id),
)
else:
cursor.execute(
"SELECT COUNT(*) FROM sys_role WHERE name = %s;", (name,)
)
return cursor.fetchone()[0] > 0
# 插入角色
def insert_role(data: dict) -> str:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO sys_role (name, remark) VALUES (%s, %s) RETURNING id;",
(data.get("name"), data.get("remark")),
)
role_id = cursor.fetchone()[0]
# 插入角色对应菜单关系
menu_ids = data.get("permissions", [])
if menu_ids:
for menu_id in menu_ids:
cursor.execute(
"INSERT INTO sys_role_menu (role_id, menu_id) VALUES (%s, %s);",
(role_id, menu_id),
)
conn.commit()
return role_id
# 更新角色
def update_role_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"UPDATE sys_role SET name = %s, remark = %s WHERE id = %s;",
(data.get("name"), data.get("remark"), id),
)
# 更新角色菜单关系:先删除旧的,再插入新的
cursor.execute("DELETE FROM sys_role_menu WHERE role_id = %s;", (id,))
menu_ids = data.get("permissions", [])
if menu_ids:
for menu_id in menu_ids:
cursor.execute(
"INSERT INTO sys_role_menu (role_id, menu_id) VALUES (%s, %s);",
(id, menu_id),
)
conn.commit()
return cursor.rowcount
# 删除角色
def delete_role_db(id: str) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# 删除角色菜单关联
cursor.execute("DELETE FROM sys_role_menu WHERE role_id = %s;", (id,))
# 删除角色
cursor.execute("DELETE FROM sys_role WHERE id = %s;", (id,))
conn.commit()
return cursor.rowcount
# 查询角色列表
def get_role_list_db() -> list[dict]:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT id, name, remark, created_at FROM sys_role;")
roles = cursor.fetchall()
result = []
for r in roles:
role_id, name, remark, created_at = r
# 查询角色对应菜单
cursor.execute(
"SELECT menu_id FROM sys_role_menu WHERE role_id = %s;", (role_id,)
)
result.append(
{
"id": role_id,
"title": name,
}
)
return result
def get_role_list_db_page(page: int, page_size: int, rid=None, name=None, remark=None):
offset = (page - 1) * page_size
conditions = []
params = []
# ---- id(必须是 UUID,否则不作为条件)----
if rid and is_valid_uuid(rid):
conditions.append("r.id = %s")
params.append(rid)
# ---- name(模糊搜索)----
if name:
conditions.append("r.name LIKE %s")
params.append(f"%{name}%")
# ---- remark(模糊搜索)----
if remark:
conditions.append("r.remark LIKE %s")
params.append(f"%{remark}%")
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ---- 总数统计 ----
count_sql = f"SELECT COUNT(*) FROM sys_role r {where_clause};"
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# ---- 分页数据 ----
list_sql = f"""
SELECT
r.id,
r.name,
r.remark,
r.created_at,
array_remove(array_agg(rm.menu_id::varchar), NULL) AS menu_ids
FROM sys_role r
LEFT JOIN sys_role_menu rm ON r.id = rm.role_id
{where_clause}
GROUP BY r.id
ORDER BY r.created_at DESC
LIMIT %s OFFSET %s;
"""
cursor.execute(list_sql, params + [page_size, offset])
rows = cursor.fetchall()
roles = []
for r in rows:
role_id, name, remark, created_at, menu_ids = r
roles.append(
{
"id": role_id,
"name": name,
"remark": remark,
"created_at": created_at,
"permissions": menu_ids or [],
}
)
return roles, total
def get_user_list_db_page(
page: int,
page_size: int,
uid=None,
username=None,
status=None,
dept_id=None,
startTime=None,
endTime=None,
):
offset = (page - 1) * page_size
conditions = []
params = []
# ---- 用户 ID,必须是 UUID,否则忽略 ----
if uid and is_valid_uuid(uid):
conditions.append("u.id = %s")
params.append(uid)
# ---- 用户名模糊搜索 ----
if username:
conditions.append("u.username LIKE %s")
params.append(f"%{username}%")
# ---- 状态 0/1 ----
if status is not None:
conditions.append("u.is_active = %s")
params.append(status == 1)
# ---- 部门 ID(也是 uuid----
if dept_id and is_valid_uuid(dept_id):
conditions.append("u.dept_id = %s")
params.append(dept_id)
# ---- 时间过滤 ----
if startTime:
conditions.append("u.created_at >= %s")
params.append(startTime)
if endTime:
conditions.append("u.created_at <= %s")
params.append(endTime)
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ---- 统计总数 ----
count_sql = f"SELECT COUNT(*) FROM users u {where_clause};"
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# ---- 分页查询 ----
list_sql = f"""
SELECT
u.id,
u.username,
u.email,
u.phone,
u.is_active,
u.dept_id,
d.name AS dept_name,
u.created_at
FROM users u
LEFT JOIN sys_dept d ON u.dept_id = d.id
{where_clause}
ORDER BY u.created_at DESC
LIMIT %s OFFSET %s;
"""
cursor.execute(list_sql, params + [page_size, offset])
rows = cursor.fetchall()
result = []
for r in rows:
(
user_id,
username,
email,
phone,
is_active,
d_id,
d_name,
created_at,
) = r
cursor.execute(
"SELECT role_id FROM sys_user_role WHERE user_id = %s;",
(user_id,),
)
role_ids = [rr[0] for rr in cursor.fetchall()]
result.append(
{
"id": user_id,
"name": username,
"email": email,
"phone": phone,
"status": 1 if is_active else 0,
"dept_id": d_id,
"dept_name": d_name,
"roles": role_ids,
"created_at": format_datetime(created_at),
}
)
return result, total
# 检查用户名是否存在
def db_user_name_exists(username: str, id: str | None = None) -> bool:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"SELECT COUNT(*) FROM users WHERE username = %s AND id <> %s;",
(username, id),
)
else:
cursor.execute(
"SELECT COUNT(*) FROM users WHERE username = %s;", (username,)
)
return cursor.fetchone()[0] > 0
# 插入用户
def insert_user(data: dict) -> str:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO users (username, email, phone, is_active, dept_id, password_hash)
VALUES (%s, %s, %s, %s, %s, %s) RETURNING id;
""",
(
data.get("name"),
data.get("email"),
data.get("phone"),
bool(data.get("status", 1)),
data.get("dept_id"),
data.get("password_hash", ""),
),
)
user_id = cursor.fetchone()[0]
# 插入用户角色
role_ids = data.get("roles", [])
if role_ids:
for role_id in role_ids:
cursor.execute(
"INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s);",
(user_id, role_id),
)
conn.commit()
return user_id
# 更新用户
def update_user_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE users
SET username=%s, email=%s, phone=%s, is_active=%s, dept_id=%s
WHERE id=%s;
""",
(
data.get("name"),
data.get("email"),
data.get("phone"),
bool(data.get("status", 1)),
data.get("dept_id"),
id,
),
)
# 更新用户角色关系:先删除旧的,再插入新的
cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,))
role_ids = data.get("roles", [])
if role_ids:
for role_id in role_ids:
cursor.execute(
"INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s);",
(id, role_id),
)
conn.commit()
return cursor.rowcount
# 局部更新用户(PATCH
def patch_user_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ------------------------
# 1. 构建动态 SQL
# ------------------------
fields = []
params = []
mapping = {
"name": "username",
"email": "email",
"phone": "phone",
"status": "is_active",
"dept_id": "dept_id",
}
for k, column in mapping.items():
if k in data:
if k == "status":
fields.append(f"{column} = %s")
params.append(bool(data[k]))
else:
fields.append(f"{column} = %s")
params.append(data[k])
# 如果有需要更新的字段
if fields:
sql = f"UPDATE users SET {', '.join(fields)} WHERE id = %s"
params.append(id)
cursor.execute(sql, tuple(params))
# ------------------------
# 2. roles 单独处理
# ------------------------
if "roles" in data:
cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,))
roles = data.get("roles") or []
for role_id in roles:
cursor.execute(
"INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s)",
(id, role_id),
)
conn.commit()
return cursor.rowcount
# 删除用户
def delete_user_db(id: str) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,))
cursor.execute("DELETE FROM users WHERE id=%s;", (id,))
conn.commit()
return cursor.rowcount
def get_role_ids_by_user(user_id: UUID):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT role_id FROM sys_user_role WHERE user_id = %s;", (str(user_id),)
)
rows = cursor.fetchall()
return [row[0] for row in rows]
def get_menu_ids_by_roles(role_ids: list):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
sql = f"SELECT DISTINCT menu_id FROM sys_role_menu WHERE role_id = ANY(%s);"
cursor.execute(sql, (role_ids,))
rows = cursor.fetchall()
return [row[0] for row in rows]
def get_menus_by_ids(menu_ids: list, plat_id: int):
if not menu_ids:
return []
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
sql = """
SELECT id, pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at
FROM sys_menu
WHERE id = ANY(%s::varchar[]) AND plat_id = %s
ORDER BY created_at ASC;
"""
# 转换 uuid 列表为 str 列表
menu_ids_str = [str(mid) for mid in menu_ids]
cursor.execute(sql, (menu_ids_str, plat_id))
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def build_menu_tree(items):
item_map = {item["id"]: item for item in items}
tree = []
for item in items:
pid = item["pid"]
if pid and pid in item_map:
item_map[pid]["children"].append(item)
else:
tree.append(item)
return tree