完善权限系统

This commit is contained in:
BBIT-Kai
2025-12-08 18:11:48 +08:00
parent c53926afd6
commit dbdc222541
1503 changed files with 132197 additions and 885 deletions
+699
View File
@@ -0,0 +1,699 @@
import json
from uuid import UUID
from config.pgDb import pg_pool
from utils.MyUtils import format_datetime, is_valid_uuid
def get_all_depts():
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, parent_id, name, comment, created_at
FROM sys_dept
ORDER BY created_at ASC;
"""
)
rows = cursor.fetchall()
# cursor.description 可用于映射字段名
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def insert_dept(parent_id, name, comment):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sys_dept (parent_id, name, comment, created_at)
VALUES (%s, %s, %s, NOW())
RETURNING id;
""",
(parent_id, name, comment),
)
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
def update_dept(id, parent_id, name, comment):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sys_dept
SET parent_id = %s,
name = %s,
comment = %s
WHERE id = %s;
""",
(parent_id, name, comment, id),
)
conn.commit()
return cursor.rowcount # >0表示成功
def has_child_dept(id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_dept WHERE parent_id = %s;
""",
(id,),
)
return cursor.fetchone()[0] > 0
def delete_dept(id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("""DELETE FROM sys_dept WHERE id = %s;""", (id,))
conn.commit()
return cursor.rowcount
def get_menus(plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at
FROM sys_menu
WHERE plat_id = %s
ORDER BY created_at ASC;
""",
(plat_id,),
)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def get_all_menus():
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at
FROM sys_menu
ORDER BY created_at ASC;
""",
)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def insert_menu(data: dict, plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sys_menu
(pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at, plat_id)
VALUES (%s, %s, %s, %s, %s,
%s, %s, %s, NOW(), NOW(), %s)
RETURNING id;
""",
(
data.get("pid"),
data.get("name"),
data.get("path"),
data.get("component"),
data.get("redirect"),
data.get("auth_code"),
data.get("type"),
json.dumps(data.get("meta") or {}),
plat_id,
),
)
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
def update_menu_db(id: str, data: dict):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sys_menu
SET pid = %s,
name = %s,
path = %s,
component = %s,
redirect = %s,
auth_code = %s,
type = %s,
meta = %s,
updated_at = NOW()
WHERE id = %s;
""",
(
data.get("pid"),
data.get("name"),
data.get("path"),
data.get("component"),
data.get("redirect"),
data.get("auth_code"),
data.get("type"),
json.dumps(data.get("meta") or {}),
id,
),
)
conn.commit()
return cursor.rowcount
def db_menu_name_exists(name: str, id: str | None, plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu
WHERE name = %s AND id <> %s AND plat_id = %s;
""",
(name, id, plat_id),
)
else:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu WHERE name = %s;
""",
(name,),
)
return cursor.fetchone()[0] > 0
def db_menu_path_exists(path: str, id: str | None, plat_id: int):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu
WHERE path = %s AND id <> %s AND plat_id = %s;
""",
(path, id, plat_id),
)
else:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu WHERE path = %s;
""",
(path,),
)
return cursor.fetchone()[0] > 0
def menu_has_children(id: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT COUNT(*) FROM sys_menu WHERE pid = %s;
""",
(id,),
)
return cursor.fetchone()[0] > 0
def delete_menu_db(id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("""DELETE FROM sys_menu WHERE id = %s;""", (id,))
conn.commit()
return cursor.rowcount
# 检查角色名是否存在
def db_role_name_exists(name: str, id: str | None = None) -> bool:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"SELECT COUNT(*) FROM sys_role WHERE name = %s AND id <> %s;",
(name, id),
)
else:
cursor.execute(
"SELECT COUNT(*) FROM sys_role WHERE name = %s;", (name,)
)
return cursor.fetchone()[0] > 0
# 插入角色
def insert_role(data: dict) -> str:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO sys_role (name, remark) VALUES (%s, %s) RETURNING id;",
(data.get("name"), data.get("remark")),
)
role_id = cursor.fetchone()[0]
# 插入角色对应菜单关系
menu_ids = data.get("permissions", [])
if menu_ids:
for menu_id in menu_ids:
cursor.execute(
"INSERT INTO sys_role_menu (role_id, menu_id) VALUES (%s, %s);",
(role_id, menu_id),
)
conn.commit()
return role_id
# 更新角色
def update_role_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"UPDATE sys_role SET name = %s, remark = %s WHERE id = %s;",
(data.get("name"), data.get("remark"), id),
)
# 更新角色菜单关系:先删除旧的,再插入新的
cursor.execute("DELETE FROM sys_role_menu WHERE role_id = %s;", (id,))
menu_ids = data.get("permissions", [])
if menu_ids:
for menu_id in menu_ids:
cursor.execute(
"INSERT INTO sys_role_menu (role_id, menu_id) VALUES (%s, %s);",
(id, menu_id),
)
conn.commit()
return cursor.rowcount
# 删除角色
def delete_role_db(id: str) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# 删除角色菜单关联
cursor.execute("DELETE FROM sys_role_menu WHERE role_id = %s;", (id,))
# 删除角色
cursor.execute("DELETE FROM sys_role WHERE id = %s;", (id,))
conn.commit()
return cursor.rowcount
# 查询角色列表
def get_role_list_db() -> list[dict]:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT id, name, remark, created_at FROM sys_role;")
roles = cursor.fetchall()
result = []
for r in roles:
role_id, name, remark, created_at = r
# 查询角色对应菜单
cursor.execute(
"SELECT menu_id FROM sys_role_menu WHERE role_id = %s;", (role_id,)
)
result.append(
{
"id": role_id,
"title": name,
}
)
return result
def get_role_list_db_page(page: int, page_size: int, rid=None, name=None, remark=None):
offset = (page - 1) * page_size
conditions = []
params = []
# ---- id(必须是 UUID,否则不作为条件)----
if rid and is_valid_uuid(rid):
conditions.append("r.id = %s")
params.append(rid)
# ---- name(模糊搜索)----
if name:
conditions.append("r.name LIKE %s")
params.append(f"%{name}%")
# ---- remark(模糊搜索)----
if remark:
conditions.append("r.remark LIKE %s")
params.append(f"%{remark}%")
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ---- 总数统计 ----
count_sql = f"SELECT COUNT(*) FROM sys_role r {where_clause};"
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# ---- 分页数据 ----
list_sql = f"""
SELECT
r.id,
r.name,
r.remark,
r.created_at,
array_remove(array_agg(rm.menu_id::varchar), NULL) AS menu_ids
FROM sys_role r
LEFT JOIN sys_role_menu rm ON r.id = rm.role_id
{where_clause}
GROUP BY r.id
ORDER BY r.created_at DESC
LIMIT %s OFFSET %s;
"""
cursor.execute(list_sql, params + [page_size, offset])
rows = cursor.fetchall()
roles = []
for r in rows:
role_id, name, remark, created_at, menu_ids = r
roles.append(
{
"id": role_id,
"name": name,
"remark": remark,
"created_at": created_at,
"permissions": menu_ids or [],
}
)
return roles, total
def get_user_list_db_page(
page: int,
page_size: int,
uid=None,
username=None,
status=None,
dept_id=None,
startTime=None,
endTime=None,
):
offset = (page - 1) * page_size
conditions = []
params = []
# ---- 用户 ID,必须是 UUID,否则忽略 ----
if uid and is_valid_uuid(uid):
conditions.append("u.id = %s")
params.append(uid)
# ---- 用户名模糊搜索 ----
if username:
conditions.append("u.username LIKE %s")
params.append(f"%{username}%")
# ---- 状态 0/1 ----
if status is not None:
conditions.append("u.is_active = %s")
params.append(status == 1)
# ---- 部门 ID(也是 uuid----
if dept_id and is_valid_uuid(dept_id):
conditions.append("u.dept_id = %s")
params.append(dept_id)
# ---- 时间过滤 ----
if startTime:
conditions.append("u.created_at >= %s")
params.append(startTime)
if endTime:
conditions.append("u.created_at <= %s")
params.append(endTime)
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ---- 统计总数 ----
count_sql = f"SELECT COUNT(*) FROM users u {where_clause};"
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# ---- 分页查询 ----
list_sql = f"""
SELECT
u.id,
u.username,
u.email,
u.phone,
u.is_active,
u.dept_id,
d.name AS dept_name,
u.created_at
FROM users u
LEFT JOIN sys_dept d ON u.dept_id = d.id
{where_clause}
ORDER BY u.created_at DESC
LIMIT %s OFFSET %s;
"""
cursor.execute(list_sql, params + [page_size, offset])
rows = cursor.fetchall()
result = []
for r in rows:
(
user_id,
username,
email,
phone,
is_active,
d_id,
d_name,
created_at,
) = r
cursor.execute(
"SELECT role_id FROM sys_user_role WHERE user_id = %s;",
(user_id,),
)
role_ids = [rr[0] for rr in cursor.fetchall()]
result.append(
{
"id": user_id,
"name": username,
"email": email,
"phone": phone,
"status": 1 if is_active else 0,
"dept_id": d_id,
"dept_name": d_name,
"roles": role_ids,
"created_at": format_datetime(created_at),
}
)
return result, total
# 检查用户名是否存在
def db_user_name_exists(username: str, id: str | None = None) -> bool:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"SELECT COUNT(*) FROM users WHERE username = %s AND id <> %s;",
(username, id),
)
else:
cursor.execute(
"SELECT COUNT(*) FROM users WHERE username = %s;", (username,)
)
return cursor.fetchone()[0] > 0
# 插入用户
def insert_user(data: dict) -> str:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO users (username, email, phone, is_active, dept_id, password_hash)
VALUES (%s, %s, %s, %s, %s, %s) RETURNING id;
""",
(
data.get("name"),
data.get("email"),
data.get("phone"),
bool(data.get("status", 1)),
data.get("dept_id"),
data.get("password_hash", ""),
),
)
user_id = cursor.fetchone()[0]
# 插入用户角色
role_ids = data.get("roles", [])
if role_ids:
for role_id in role_ids:
cursor.execute(
"INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s);",
(user_id, role_id),
)
conn.commit()
return user_id
# 更新用户
def update_user_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE users
SET username=%s, email=%s, phone=%s, is_active=%s, dept_id=%s
WHERE id=%s;
""",
(
data.get("name"),
data.get("email"),
data.get("phone"),
bool(data.get("status", 1)),
data.get("dept_id"),
id,
),
)
# 更新用户角色关系:先删除旧的,再插入新的
cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,))
role_ids = data.get("roles", [])
if role_ids:
for role_id in role_ids:
cursor.execute(
"INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s);",
(id, role_id),
)
conn.commit()
return cursor.rowcount
# 局部更新用户(PATCH
def patch_user_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ------------------------
# 1. 构建动态 SQL
# ------------------------
fields = []
params = []
mapping = {
"name": "username",
"email": "email",
"phone": "phone",
"status": "is_active",
"dept_id": "dept_id",
}
for k, column in mapping.items():
if k in data:
if k == "status":
fields.append(f"{column} = %s")
params.append(bool(data[k]))
else:
fields.append(f"{column} = %s")
params.append(data[k])
# 如果有需要更新的字段
if fields:
sql = f"UPDATE users SET {', '.join(fields)} WHERE id = %s"
params.append(id)
cursor.execute(sql, tuple(params))
# ------------------------
# 2. roles 单独处理
# ------------------------
if "roles" in data:
cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,))
roles = data.get("roles") or []
for role_id in roles:
cursor.execute(
"INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s)",
(id, role_id),
)
conn.commit()
return cursor.rowcount
# 删除用户
def delete_user_db(id: str) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,))
cursor.execute("DELETE FROM users WHERE id=%s;", (id,))
conn.commit()
return cursor.rowcount
def get_role_ids_by_user(user_id: UUID):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT role_id FROM sys_user_role WHERE user_id = %s;", (str(user_id),)
)
rows = cursor.fetchall()
return [row[0] for row in rows]
def get_menu_ids_by_roles(role_ids: list):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
sql = f"SELECT DISTINCT menu_id FROM sys_role_menu WHERE role_id = ANY(%s);"
cursor.execute(sql, (role_ids,))
rows = cursor.fetchall()
return [row[0] for row in rows]
def get_menus_by_ids(menu_ids: list, plat_id: int):
if not menu_ids:
return []
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
sql = """
SELECT id, pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at
FROM sys_menu
WHERE id = ANY(%s::varchar[]) AND plat_id = %s
ORDER BY created_at ASC;
"""
# 转换 uuid 列表为 str 列表
menu_ids_str = [str(mid) for mid in menu_ids]
cursor.execute(sql, (menu_ids_str, plat_id))
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def build_menu_tree(items):
item_map = {item["id"]: item for item in items}
tree = []
for item in items:
pid = item["pid"]
if pid and pid in item_map:
item_map[pid]["children"].append(item)
else:
tree.append(item)
return tree