1009 lines
31 KiB
Python
1009 lines
31 KiB
Python
import json
|
||
from uuid import UUID
|
||
|
||
from config.pgDb import pg_pool
|
||
from utils import MyUtils
|
||
from utils.MyUtils import format_datetime, is_valid_uuid
|
||
|
||
|
||
def get_all_depts():
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
SELECT id, parent_id, name, comment, created_at
|
||
FROM sys_dept
|
||
ORDER BY created_at ASC;
|
||
"""
|
||
)
|
||
rows = cursor.fetchall()
|
||
# cursor.description 可用于映射字段名
|
||
columns = [desc[0] for desc in cursor.description]
|
||
return [dict(zip(columns, row)) for row in rows]
|
||
|
||
|
||
def insert_dept(parent_id, name, comment):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
INSERT INTO sys_dept (parent_id, name, comment, created_at)
|
||
VALUES (%s, %s, %s, NOW())
|
||
RETURNING id;
|
||
""",
|
||
(parent_id, name, comment),
|
||
)
|
||
new_id = cursor.fetchone()[0]
|
||
conn.commit()
|
||
return new_id
|
||
|
||
|
||
def update_dept(id, parent_id, name, comment):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
UPDATE sys_dept
|
||
SET parent_id = %s,
|
||
name = %s,
|
||
comment = %s
|
||
WHERE id = %s;
|
||
""",
|
||
(parent_id, name, comment, id),
|
||
)
|
||
conn.commit()
|
||
return cursor.rowcount # >0表示成功
|
||
|
||
|
||
def has_child_dept(id):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
SELECT COUNT(*) FROM sys_dept WHERE parent_id = %s;
|
||
""",
|
||
(id,),
|
||
)
|
||
return cursor.fetchone()[0] > 0
|
||
|
||
|
||
def delete_dept(id):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("""DELETE FROM sys_dept WHERE id = %s;""", (id,))
|
||
conn.commit()
|
||
return cursor.rowcount
|
||
|
||
|
||
def get_menus(plat_id: int):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
SELECT id, pid, name, path, component, redirect,
|
||
auth_code, type, meta, created_at, updated_at
|
||
FROM sys_menu
|
||
WHERE plat_id = %s
|
||
ORDER BY created_at ASC;
|
||
""",
|
||
(plat_id,),
|
||
)
|
||
rows = cursor.fetchall()
|
||
columns = [desc[0] for desc in cursor.description]
|
||
return [dict(zip(columns, row)) for row in rows]
|
||
|
||
|
||
def get_all_menus():
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
SELECT id, pid, name, path, component, redirect,
|
||
auth_code, type, meta, created_at, updated_at
|
||
FROM sys_menu
|
||
ORDER BY created_at ASC;
|
||
""",
|
||
)
|
||
rows = cursor.fetchall()
|
||
columns = [desc[0] for desc in cursor.description]
|
||
return [dict(zip(columns, row)) for row in rows]
|
||
|
||
|
||
def insert_menu(data: dict, plat_id: int):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
INSERT INTO sys_menu
|
||
(pid, name, path, component, redirect,
|
||
auth_code, type, meta, created_at, updated_at, plat_id)
|
||
VALUES (%s, %s, %s, %s, %s,
|
||
%s, %s, %s, NOW(), NOW(), %s)
|
||
RETURNING id;
|
||
""",
|
||
(
|
||
data.get("pid"),
|
||
data.get("name"),
|
||
data.get("path"),
|
||
data.get("component"),
|
||
data.get("redirect"),
|
||
data.get("auth_code"),
|
||
data.get("type"),
|
||
json.dumps(data.get("meta") or {}),
|
||
plat_id,
|
||
),
|
||
)
|
||
new_id = cursor.fetchone()[0]
|
||
conn.commit()
|
||
return new_id
|
||
|
||
|
||
def update_menu_db(id: str, data: dict):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
UPDATE sys_menu
|
||
SET pid = %s,
|
||
name = %s,
|
||
path = %s,
|
||
component = %s,
|
||
redirect = %s,
|
||
auth_code = %s,
|
||
type = %s,
|
||
meta = %s,
|
||
updated_at = NOW()
|
||
WHERE id = %s;
|
||
""",
|
||
(
|
||
data.get("pid"),
|
||
data.get("name"),
|
||
data.get("path"),
|
||
data.get("component"),
|
||
data.get("redirect"),
|
||
data.get("auth_code"),
|
||
data.get("type"),
|
||
json.dumps(data.get("meta") or {}),
|
||
id,
|
||
),
|
||
)
|
||
conn.commit()
|
||
return cursor.rowcount
|
||
|
||
|
||
def db_menu_name_exists(name: str, id: str | None, plat_id: int):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
if id:
|
||
cursor.execute(
|
||
"""
|
||
SELECT COUNT(*) FROM sys_menu
|
||
WHERE name = %s AND id <> %s AND plat_id = %s;
|
||
""",
|
||
(name, id, plat_id),
|
||
)
|
||
else:
|
||
cursor.execute(
|
||
"""
|
||
SELECT COUNT(*) FROM sys_menu WHERE name = %s;
|
||
""",
|
||
(name,),
|
||
)
|
||
return cursor.fetchone()[0] > 0
|
||
|
||
|
||
def db_menu_path_exists(path: str, id: str | None, plat_id: int):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
if id:
|
||
cursor.execute(
|
||
"""
|
||
SELECT COUNT(*) FROM sys_menu
|
||
WHERE path = %s AND id <> %s AND plat_id = %s;
|
||
""",
|
||
(path, id, plat_id),
|
||
)
|
||
else:
|
||
cursor.execute(
|
||
"""
|
||
SELECT COUNT(*) FROM sys_menu WHERE path = %s;
|
||
""",
|
||
(path,),
|
||
)
|
||
return cursor.fetchone()[0] > 0
|
||
|
||
|
||
def menu_has_children(id: str):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
SELECT COUNT(*) FROM sys_menu WHERE pid = %s;
|
||
""",
|
||
(id,),
|
||
)
|
||
return cursor.fetchone()[0] > 0
|
||
|
||
|
||
def delete_menu_db(id):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("""DELETE FROM sys_menu WHERE id = %s;""", (id,))
|
||
conn.commit()
|
||
return cursor.rowcount
|
||
|
||
|
||
# 检查角色名是否存在
|
||
def db_role_name_exists(name: str, id: str | None = None) -> bool:
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
if id:
|
||
cursor.execute(
|
||
"SELECT COUNT(*) FROM sys_role WHERE name = %s AND id <> %s;",
|
||
(name, id),
|
||
)
|
||
else:
|
||
cursor.execute(
|
||
"SELECT COUNT(*) FROM sys_role WHERE name = %s;", (name,)
|
||
)
|
||
return cursor.fetchone()[0] > 0
|
||
|
||
|
||
# 插入角色
|
||
def insert_role(data: dict) -> str:
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"INSERT INTO sys_role (name, remark) VALUES (%s, %s) RETURNING id;",
|
||
(data.get("name"), data.get("remark")),
|
||
)
|
||
role_id = cursor.fetchone()[0]
|
||
|
||
# 插入角色对应菜单关系
|
||
menu_ids = data.get("permissions", [])
|
||
if menu_ids:
|
||
for menu_id in menu_ids:
|
||
cursor.execute(
|
||
"INSERT INTO sys_role_menu (role_id, menu_id) VALUES (%s, %s);",
|
||
(role_id, menu_id),
|
||
)
|
||
|
||
conn.commit()
|
||
return role_id
|
||
|
||
|
||
# 更新角色
|
||
def update_role_db(id: str, data: dict) -> int:
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"UPDATE sys_role SET name = %s, remark = %s WHERE id = %s;",
|
||
(data.get("name"), data.get("remark"), id),
|
||
)
|
||
|
||
# 更新角色菜单关系:先删除旧的,再插入新的
|
||
cursor.execute("DELETE FROM sys_role_menu WHERE role_id = %s;", (id,))
|
||
menu_ids = data.get("permissions", [])
|
||
if menu_ids:
|
||
for menu_id in menu_ids:
|
||
cursor.execute(
|
||
"INSERT INTO sys_role_menu (role_id, menu_id) VALUES (%s, %s);",
|
||
(id, menu_id),
|
||
)
|
||
|
||
conn.commit()
|
||
return cursor.rowcount
|
||
|
||
|
||
# 删除角色
|
||
def delete_role_db(id: str) -> int:
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
# 删除角色菜单关联
|
||
cursor.execute("DELETE FROM sys_role_menu WHERE role_id = %s;", (id,))
|
||
# 删除角色
|
||
cursor.execute("DELETE FROM sys_role WHERE id = %s;", (id,))
|
||
conn.commit()
|
||
return cursor.rowcount
|
||
|
||
|
||
# 查询角色列表
|
||
def get_role_list_db() -> list[dict]:
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("SELECT id, name, remark, created_at FROM sys_role;")
|
||
roles = cursor.fetchall()
|
||
|
||
result = []
|
||
for r in roles:
|
||
role_id, name, remark, created_at = r
|
||
# 查询角色对应菜单
|
||
cursor.execute(
|
||
"SELECT menu_id FROM sys_role_menu WHERE role_id = %s;", (role_id,)
|
||
)
|
||
result.append(
|
||
{
|
||
"id": role_id,
|
||
"title": name,
|
||
}
|
||
)
|
||
return result
|
||
|
||
|
||
def get_role_list_db_page(page: int, page_size: int, rid=None, name=None, remark=None):
|
||
offset = (page - 1) * page_size
|
||
|
||
conditions = []
|
||
params = []
|
||
|
||
# ---- id(必须是 UUID,否则不作为条件)----
|
||
if rid and is_valid_uuid(rid):
|
||
conditions.append("r.id = %s")
|
||
params.append(rid)
|
||
|
||
# ---- name(模糊搜索)----
|
||
if name:
|
||
conditions.append("r.name LIKE %s")
|
||
params.append(f"%{name}%")
|
||
|
||
# ---- remark(模糊搜索)----
|
||
if remark:
|
||
conditions.append("r.remark LIKE %s")
|
||
params.append(f"%{remark}%")
|
||
|
||
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
|
||
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
|
||
# ---- 总数统计 ----
|
||
count_sql = f"SELECT COUNT(*) FROM sys_role r {where_clause};"
|
||
cursor.execute(count_sql, params)
|
||
total = cursor.fetchone()[0]
|
||
|
||
# ---- 分页数据 ----
|
||
list_sql = f"""
|
||
SELECT
|
||
r.id,
|
||
r.name,
|
||
r.remark,
|
||
r.created_at,
|
||
array_remove(array_agg(rm.menu_id::varchar), NULL) AS menu_ids
|
||
FROM sys_role r
|
||
LEFT JOIN sys_role_menu rm ON r.id = rm.role_id
|
||
{where_clause}
|
||
GROUP BY r.id
|
||
ORDER BY r.created_at DESC
|
||
LIMIT %s OFFSET %s;
|
||
"""
|
||
|
||
cursor.execute(list_sql, params + [page_size, offset])
|
||
rows = cursor.fetchall()
|
||
|
||
roles = []
|
||
for r in rows:
|
||
role_id, name, remark, created_at, menu_ids = r
|
||
roles.append(
|
||
{
|
||
"id": role_id,
|
||
"name": name,
|
||
"remark": remark,
|
||
"created_at": format_datetime(created_at),
|
||
"permissions": menu_ids or [],
|
||
}
|
||
)
|
||
|
||
return roles, total
|
||
|
||
|
||
def get_user_list_db_page(
|
||
page: int,
|
||
page_size: int,
|
||
uid=None,
|
||
username=None,
|
||
status=None,
|
||
dept_id=None,
|
||
startTime=None,
|
||
endTime=None,
|
||
):
|
||
offset = (page - 1) * page_size
|
||
|
||
conditions = []
|
||
params = []
|
||
|
||
# ---- 用户 ID,必须是 UUID,否则忽略 ----
|
||
if uid:
|
||
conditions.append("u.id::text LIKE %s")
|
||
params.append(f"%{uid}%")
|
||
|
||
# ---- 用户名模糊搜索 ----
|
||
if username:
|
||
conditions.append("u.username LIKE %s")
|
||
params.append(f"%{username}%")
|
||
|
||
# ---- 状态 0/1 ----
|
||
if status is not None:
|
||
conditions.append("u.is_active = %s")
|
||
params.append(status == 1)
|
||
|
||
# ---- 部门 ID(也是 uuid)----
|
||
if dept_id and is_valid_uuid(dept_id):
|
||
conditions.append("u.dept_id = %s")
|
||
params.append(dept_id)
|
||
|
||
# ---- 时间过滤 ----
|
||
if startTime:
|
||
conditions.append("u.created_at >= %s")
|
||
params.append(startTime)
|
||
|
||
if endTime:
|
||
conditions.append("u.created_at <= %s")
|
||
params.append(endTime)
|
||
|
||
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
|
||
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
|
||
# ---- 统计总数 ----
|
||
count_sql = f"SELECT COUNT(*) FROM sys_users u {where_clause};"
|
||
cursor.execute(count_sql, params)
|
||
total = cursor.fetchone()[0]
|
||
|
||
# ---- 分页查询 ----
|
||
list_sql = f"""
|
||
SELECT
|
||
u.id,
|
||
u.username,
|
||
u.email,
|
||
u.phone,
|
||
u.is_active,
|
||
u.dept_id,
|
||
d.name AS dept_name,
|
||
u.created_at
|
||
FROM sys_users u
|
||
LEFT JOIN sys_dept d ON u.dept_id = d.id
|
||
{where_clause}
|
||
ORDER BY u.created_at DESC
|
||
LIMIT %s OFFSET %s;
|
||
"""
|
||
|
||
cursor.execute(list_sql, params + [page_size, offset])
|
||
rows = cursor.fetchall()
|
||
|
||
result = []
|
||
for r in rows:
|
||
(
|
||
user_id,
|
||
username,
|
||
email,
|
||
phone,
|
||
is_active,
|
||
d_id,
|
||
d_name,
|
||
created_at,
|
||
) = r
|
||
|
||
cursor.execute(
|
||
"SELECT role_id FROM sys_user_role WHERE user_id = %s;",
|
||
(user_id,),
|
||
)
|
||
role_ids = [rr[0] for rr in cursor.fetchall()]
|
||
|
||
result.append(
|
||
{
|
||
"id": user_id,
|
||
"name": username,
|
||
"email": email,
|
||
"phone": phone,
|
||
"status": 1 if is_active else 0,
|
||
"dept_id": d_id,
|
||
"dept_name": d_name,
|
||
"roles": role_ids,
|
||
"created_at": format_datetime(created_at),
|
||
}
|
||
)
|
||
|
||
return result, total
|
||
|
||
|
||
# 检查登录账号是否存在
|
||
def db_user_name_exists(username: str) -> bool:
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"SELECT COUNT(*) FROM sys_users WHERE email = %s;", (username,)
|
||
)
|
||
return cursor.fetchone()[0] > 0
|
||
|
||
|
||
# 插入用户
|
||
def insert_user(data: dict) -> str:
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
INSERT INTO sys_users (username, email, phone, is_active, dept_id, password_hash)
|
||
VALUES (%s, %s, %s, %s, %s, %s) RETURNING id;
|
||
""",
|
||
(
|
||
data.get("name"),
|
||
data.get("email"),
|
||
data.get("phone"),
|
||
bool(data.get("status", 1)),
|
||
data.get("dept_id"),
|
||
data.get("password_hash", ""),
|
||
),
|
||
)
|
||
user_id = cursor.fetchone()[0]
|
||
|
||
# 插入用户角色
|
||
role_ids = data.get("roles", [])
|
||
if role_ids:
|
||
for role_id in role_ids:
|
||
cursor.execute(
|
||
"INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s);",
|
||
(user_id, role_id),
|
||
)
|
||
conn.commit()
|
||
return user_id
|
||
|
||
|
||
# 局部更新用户(PATCH)
|
||
def patch_user_db(id: str, data: dict) -> int:
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
|
||
# ------------------------
|
||
# 1. 构建动态 SQL
|
||
# ------------------------
|
||
fields = []
|
||
params = []
|
||
|
||
mapping = {
|
||
"name": "username",
|
||
"email": "email",
|
||
"phone": "phone",
|
||
"password_hash": "password_hash",
|
||
"status": "is_active",
|
||
"dept_id": "dept_id",
|
||
}
|
||
|
||
for k, column in mapping.items():
|
||
if k in data:
|
||
if k == "status":
|
||
fields.append(f"{column} = %s")
|
||
params.append(bool(data[k]))
|
||
else:
|
||
fields.append(f"{column} = %s")
|
||
params.append(data[k])
|
||
|
||
# 如果有需要更新的字段
|
||
if fields:
|
||
sql = f"UPDATE sys_users SET {', '.join(fields)} WHERE id = %s"
|
||
params.append(id)
|
||
cursor.execute(sql, tuple(params))
|
||
|
||
# ------------------------
|
||
# 2. roles 单独处理
|
||
# ------------------------
|
||
if "roles" in data:
|
||
cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,))
|
||
roles = data.get("roles") or []
|
||
for role_id in roles:
|
||
cursor.execute(
|
||
"INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s)",
|
||
(id, role_id),
|
||
)
|
||
|
||
conn.commit()
|
||
return cursor.rowcount
|
||
|
||
|
||
# 删除用户
|
||
def delete_user_db(id: str) -> int:
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,))
|
||
cursor.execute("DELETE FROM sys_users WHERE id=%s;", (id,))
|
||
conn.commit()
|
||
return cursor.rowcount
|
||
|
||
|
||
def get_role_ids_by_user(user_id: UUID):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"SELECT role_id FROM sys_user_role WHERE user_id = %s;", (str(user_id),)
|
||
)
|
||
rows = cursor.fetchall()
|
||
return [row[0] for row in rows]
|
||
|
||
|
||
def get_menu_ids_by_roles(role_ids: list):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
sql = f"SELECT DISTINCT menu_id FROM sys_role_menu WHERE role_id = ANY(%s);"
|
||
cursor.execute(sql, (role_ids,))
|
||
rows = cursor.fetchall()
|
||
return [row[0] for row in rows]
|
||
|
||
|
||
def get_menus_by_ids(menu_ids: list, plat_id: int):
|
||
if not menu_ids:
|
||
return []
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
sql = """
|
||
SELECT id, pid, name, path, component, redirect,
|
||
auth_code, type, meta, created_at, updated_at
|
||
FROM sys_menu
|
||
WHERE id = ANY(%s::varchar[]) AND plat_id = %s AND type != 'button'
|
||
ORDER BY created_at ASC;
|
||
"""
|
||
# 转换 uuid 列表为 str 列表
|
||
menu_ids_str = [str(mid) for mid in menu_ids]
|
||
cursor.execute(sql, (menu_ids_str, plat_id))
|
||
rows = cursor.fetchall()
|
||
columns = [desc[0] for desc in cursor.description]
|
||
return [dict(zip(columns, row)) for row in rows]
|
||
|
||
|
||
def build_menu_tree(items):
|
||
item_map = {item["id"]: item for item in items}
|
||
tree = []
|
||
for item in items:
|
||
pid = item["pid"]
|
||
if pid and pid in item_map:
|
||
item_map[pid]["children"].append(item)
|
||
else:
|
||
tree.append(item)
|
||
return tree
|
||
|
||
|
||
def get_dict_list(keyword="", page=1, page_size=10):
|
||
"""
|
||
获取系统字典列表,支持分页和关键字搜索(key / name)
|
||
"""
|
||
offset = (page - 1) * page_size
|
||
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
# 1️⃣ 查询总条数
|
||
cursor.execute(
|
||
"""
|
||
SELECT COUNT(*)
|
||
FROM sys_dict
|
||
WHERE (
|
||
%s = ''
|
||
OR key ILIKE '%%' || %s || '%%'
|
||
OR name ILIKE '%%' || %s || '%%'
|
||
)
|
||
""",
|
||
(keyword, keyword, keyword),
|
||
)
|
||
total = cursor.fetchone()[0]
|
||
|
||
# 2️⃣ 查询当前页数据
|
||
cursor.execute(
|
||
"""
|
||
SELECT id, key, name, remark, created_at
|
||
FROM sys_dict
|
||
WHERE (
|
||
%s = ''
|
||
OR key ILIKE '%%' || %s || '%%'
|
||
OR name ILIKE '%%' || %s || '%%'
|
||
)
|
||
ORDER BY created_at DESC
|
||
LIMIT %s OFFSET %s
|
||
""",
|
||
(keyword, keyword, keyword, page_size, offset),
|
||
)
|
||
|
||
rows = cursor.fetchall()
|
||
|
||
result = []
|
||
for row in rows:
|
||
result.append(
|
||
{
|
||
"id": row[0],
|
||
"key": row[1],
|
||
"name": row[2],
|
||
"remark": row[3],
|
||
"created_at": MyUtils.format_datetime(row[4]),
|
||
}
|
||
)
|
||
|
||
return total, result
|
||
|
||
|
||
def db_create_dict(key: str, name: str, remark: str):
|
||
"""
|
||
在数据库中创建字典
|
||
"""
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
INSERT INTO sys_dict (id, key, name, remark, created_at)
|
||
VALUES (gen_random_uuid(), %s, %s, %s, now())
|
||
RETURNING id
|
||
""",
|
||
(key, name, remark),
|
||
)
|
||
new_id = cursor.fetchone()[0]
|
||
return new_id
|
||
|
||
|
||
def db_update_dict(id: str, key: str, name: str, remark: str):
|
||
"""
|
||
在数据库中更新字典
|
||
"""
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
UPDATE sys_dict
|
||
SET key=%s, name=%s, remark=%s
|
||
WHERE id=%s
|
||
""",
|
||
(key, name, remark, id),
|
||
)
|
||
return id
|
||
|
||
|
||
def db_delete_dict(id: str):
|
||
"""
|
||
在数据库中删除字典
|
||
"""
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("DELETE FROM sys_dict WHERE id=%s", (id,))
|
||
return id
|
||
|
||
|
||
def db_get_dict_detail(dict_id: str):
|
||
"""
|
||
获取字典详情列表
|
||
"""
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
SELECT id, value, sort, pid, dict_id, remark, created_at, updated_at
|
||
FROM sys_dict_detail
|
||
WHERE dict_id=%s
|
||
ORDER BY sort ASC, created_at ASC
|
||
""",
|
||
(dict_id,),
|
||
)
|
||
rows = cursor.fetchall()
|
||
result = []
|
||
for row in rows:
|
||
result.append(
|
||
{
|
||
"id": row[0],
|
||
"value": row[1],
|
||
"sort": row[2],
|
||
"pid": row[3],
|
||
"dict_id": row[4],
|
||
"remark": row[5],
|
||
"created_at": row[6].isoformat() if row[6] else None,
|
||
"updated_at": row[7].isoformat() if row[7] else None,
|
||
}
|
||
)
|
||
return result
|
||
|
||
|
||
def db_create_dict_detail(
|
||
value: str, dict_id: str, sort: int = 0, pid: str = None, remark: str = None
|
||
):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
INSERT INTO sys_dict_detail (id, value, dict_id, sort, pid, remark, created_at)
|
||
VALUES (gen_random_uuid(), %s, %s, %s, %s, %s, now())
|
||
RETURNING id
|
||
""",
|
||
(value, dict_id, sort, pid, remark),
|
||
)
|
||
new_id = cursor.fetchone()[0]
|
||
return new_id
|
||
|
||
|
||
def db_update_dict_detail(
|
||
id: str, value: str = None, sort: int = None, pid: str = None, remark: str = None
|
||
):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
UPDATE sys_dict_detail
|
||
SET value=COALESCE(%s, value),
|
||
sort=COALESCE(%s, sort),
|
||
pid=COALESCE(%s, pid),
|
||
remark=COALESCE(%s, remark),
|
||
updated_at=now()
|
||
WHERE id=%s
|
||
""",
|
||
(value, sort, pid, remark, id),
|
||
)
|
||
return id
|
||
|
||
|
||
def db_delete_dict_detail(id: str):
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("DELETE FROM sys_dict_detail WHERE id=%s", (id,))
|
||
return id
|
||
|
||
|
||
def get_dict_detail_list_by_key(dict_key: str):
|
||
"""
|
||
通过字典 key 获取字典明细列表
|
||
"""
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
SELECT
|
||
d.id,
|
||
d.value,
|
||
d.sort,
|
||
d.pid,
|
||
d.remark,
|
||
d.created_at
|
||
FROM sys_dict_detail d
|
||
JOIN sys_dict s ON d.dict_id = s.id
|
||
WHERE s.key = %s
|
||
ORDER BY d.sort ASC, d.created_at ASC
|
||
""",
|
||
(dict_key,),
|
||
)
|
||
|
||
rows = cursor.fetchall()
|
||
|
||
result = []
|
||
for row in rows:
|
||
result.append(
|
||
{
|
||
"id": row[0],
|
||
"value": row[1],
|
||
"sort": row[2],
|
||
"pid": row[3],
|
||
"remark": row[4],
|
||
"created_at": MyUtils.format_datetime(row[5]),
|
||
}
|
||
)
|
||
|
||
return result
|
||
|
||
|
||
def get_dept_ids_by_user_id(user_id: UUID) -> list:
|
||
# 第一步:通过 user_id 查找其所属的 dept_id
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("SELECT dept_id FROM sys_users WHERE id = %s", (user_id,))
|
||
dept_id = cursor.fetchone()
|
||
|
||
if not dept_id:
|
||
raise ValueError(
|
||
f"User with id {user_id} not found or has no department."
|
||
)
|
||
|
||
dept_id = dept_id[0]
|
||
|
||
# 第二步:通过 dept_id 获取所有下级部门的 dept_id
|
||
cursor.execute(
|
||
"""
|
||
WITH RECURSIVE dept_hierarchy AS (
|
||
SELECT id FROM sys_dept WHERE id = %s
|
||
UNION
|
||
SELECT d.id FROM sys_dept d
|
||
INNER JOIN dept_hierarchy dh ON d.parent_id = dh.id
|
||
)
|
||
SELECT id FROM dept_hierarchy;
|
||
""",
|
||
(dept_id,),
|
||
)
|
||
dept_ids = [row[0] for row in cursor.fetchall()]
|
||
|
||
return dept_ids
|
||
|
||
|
||
def get_dept_ids_by_user_id(user_id: UUID) -> list:
|
||
# 第一步:通过 user_id 查找其所属的 dept_id
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("SELECT dept_id FROM sys_users WHERE id = %s", (user_id,))
|
||
dept_id = cursor.fetchone()
|
||
|
||
if not dept_id:
|
||
raise ValueError(
|
||
f"User with id {user_id} not found or has no department."
|
||
)
|
||
|
||
dept_id = dept_id[0]
|
||
|
||
# 第二步:通过 dept_id 获取所有下级部门的 dept_id
|
||
cursor.execute(
|
||
"""
|
||
WITH RECURSIVE dept_hierarchy AS (
|
||
SELECT id FROM sys_dept WHERE id = %s
|
||
UNION
|
||
SELECT d.id FROM sys_dept d
|
||
INNER JOIN dept_hierarchy dh ON d.parent_id = dh.id
|
||
)
|
||
SELECT id FROM dept_hierarchy;
|
||
""",
|
||
(dept_id,),
|
||
)
|
||
dept_ids = [row[0] for row in cursor.fetchall()]
|
||
|
||
return dept_ids
|
||
|
||
|
||
def get_dept_id_by_user_id(user_id: str) -> str:
|
||
# 通过 user_id 查找其所属的 dept_id
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("SELECT dept_id FROM sys_users WHERE id = %s", (user_id,))
|
||
dept_id = cursor.fetchone()
|
||
dept_id = dept_id[0]
|
||
return str(dept_id)
|
||
|
||
|
||
def get_dept_id_by_iot_user_name(user_id: str) -> str:
|
||
# 通过 iot_user_id 查找其所属的 dept_id
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("SELECT dept_id FROM iot_users WHERE name = %s", (user_id,))
|
||
dept_id = cursor.fetchone()
|
||
dept_id = str(dept_id[0])
|
||
return dept_id
|
||
|
||
|
||
def get_device_type_by_iot_user_name(user_id: str) -> str:
|
||
# 通过 iot_user_id 查找其所属的 type
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("SELECT type FROM iot_users WHERE name = %s", (user_id,))
|
||
type = cursor.fetchone()
|
||
type = str(type[0])
|
||
return type
|
||
|
||
|
||
from typing import List
|
||
|
||
|
||
def get_dept_ids_by_dept_id(dept_id: str) -> List[str]:
|
||
"""
|
||
获取当前部门 ID 以及其所有父部门 ID(递归向上)
|
||
返回顺序:从当前部门一直到最顶层父部门
|
||
"""
|
||
with pg_pool.getConn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
WITH RECURSIVE dept_tree AS (
|
||
-- 起点:当前部门
|
||
SELECT id, parent_id
|
||
FROM sys_dept
|
||
WHERE id = %s
|
||
|
||
UNION ALL
|
||
|
||
-- 向上递归找父部门
|
||
SELECT d.id, d.parent_id
|
||
FROM sys_dept d
|
||
INNER JOIN dept_tree dt ON d.id = dt.parent_id
|
||
)
|
||
SELECT id FROM dept_tree;
|
||
""",
|
||
(dept_id,),
|
||
)
|
||
|
||
rows = cursor.fetchall()
|
||
return [str(row[0]) for row in rows]
|