牧安云哨-后端

This commit is contained in:
BBIT-Kai
2025-12-23 09:55:06 +08:00
parent f018d96e60
commit b5a9714025
16 changed files with 1219 additions and 75 deletions
+303 -50
View File
@@ -2,6 +2,7 @@ import json
from uuid import UUID
from config.pgDb import pg_pool
from utils import MyUtils
from utils.MyUtils import format_datetime, is_valid_uuid
@@ -387,7 +388,7 @@ def get_role_list_db_page(page: int, page_size: int, rid=None, name=None, remark
"id": role_id,
"name": name,
"remark": remark,
"created_at": created_at,
"created_at": format_datetime(created_at),
"permissions": menu_ids or [],
}
)
@@ -411,9 +412,9 @@ def get_user_list_db_page(
params = []
# ---- 用户 ID,必须是 UUID,否则忽略 ----
if uid and is_valid_uuid(uid):
conditions.append("u.id = %s")
params.append(uid)
if uid:
conditions.append("u.id::text LIKE %s")
params.append(f"%{uid}%")
# ---- 用户名模糊搜索 ----
if username:
@@ -506,19 +507,11 @@ def get_user_list_db_page(
return result, total
# 检查用户名是否存在
def db_user_name_exists(username: str, id: str | None = None) -> bool:
# 检查登录账号是否存在
def db_user_name_exists(username: str) -> bool:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
if id:
cursor.execute(
"SELECT COUNT(*) FROM users WHERE username = %s AND id <> %s;",
(username, id),
)
else:
cursor.execute(
"SELECT COUNT(*) FROM users WHERE username = %s;", (username,)
)
cursor.execute("SELECT COUNT(*) FROM users WHERE email = %s;", (username,))
return cursor.fetchone()[0] > 0
@@ -554,40 +547,6 @@ def insert_user(data: dict) -> str:
return user_id
# 更新用户
def update_user_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE users
SET username=%s, email=%s, phone=%s, is_active=%s, dept_id=%s
WHERE id=%s;
""",
(
data.get("name"),
data.get("email"),
data.get("phone"),
bool(data.get("status", 1)),
data.get("dept_id"),
id,
),
)
# 更新用户角色关系:先删除旧的,再插入新的
cursor.execute("DELETE FROM sys_user_role WHERE user_id=%s;", (id,))
role_ids = data.get("roles", [])
if role_ids:
for role_id in role_ids:
cursor.execute(
"INSERT INTO sys_user_role (user_id, role_id) VALUES (%s, %s);",
(id, role_id),
)
conn.commit()
return cursor.rowcount
# 局部更新用户(PATCH
def patch_user_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
@@ -603,6 +562,7 @@ def patch_user_db(id: str, data: dict) -> int:
"name": "username",
"email": "email",
"phone": "phone",
"password_hash": "password_hash",
"status": "is_active",
"dept_id": "dept_id",
}
@@ -676,7 +636,7 @@ def get_menus_by_ids(menu_ids: list, plat_id: int):
SELECT id, pid, name, path, component, redirect,
auth_code, type, meta, created_at, updated_at
FROM sys_menu
WHERE id = ANY(%s::varchar[]) AND plat_id = %s
WHERE id = ANY(%s::varchar[]) AND plat_id = %s AND type != 'button'
ORDER BY created_at ASC;
"""
# 转换 uuid 列表为 str 列表
@@ -697,3 +657,296 @@ def build_menu_tree(items):
else:
tree.append(item)
return tree
def get_dict_list(keyword="", page=1, page_size=10):
"""
获取系统字典列表,支持分页和关键字搜索(key / name)
"""
offset = (page - 1) * page_size
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# 1️⃣ 查询总条数
cursor.execute(
"""
SELECT COUNT(*)
FROM sys_dict
WHERE (
%s = ''
OR key ILIKE '%%' || %s || '%%'
OR name ILIKE '%%' || %s || '%%'
)
""",
(keyword, keyword, keyword),
)
total = cursor.fetchone()[0]
# 2️⃣ 查询当前页数据
cursor.execute(
"""
SELECT id, key, name, remark, created_at
FROM sys_dict
WHERE (
%s = ''
OR key ILIKE '%%' || %s || '%%'
OR name ILIKE '%%' || %s || '%%'
)
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(keyword, keyword, keyword, page_size, offset),
)
rows = cursor.fetchall()
result = []
for row in rows:
result.append(
{
"id": row[0],
"key": row[1],
"name": row[2],
"remark": row[3],
"created_at": MyUtils.format_datetime(row[4]),
}
)
return total, result
def db_create_dict(key: str, name: str, remark: str):
"""
在数据库中创建字典
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sys_dict (id, key, name, remark, created_at)
VALUES (gen_random_uuid(), %s, %s, %s, now())
RETURNING id
""",
(key, name, remark),
)
new_id = cursor.fetchone()[0]
return new_id
def db_update_dict(id: str, key: str, name: str, remark: str):
"""
在数据库中更新字典
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sys_dict
SET key=%s, name=%s, remark=%s
WHERE id=%s
""",
(key, name, remark, id),
)
return id
def db_delete_dict(id: str):
"""
在数据库中删除字典
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM sys_dict WHERE id=%s", (id,))
return id
def db_get_dict_detail(dict_id: str):
"""
获取字典详情列表
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, value, sort, pid, dict_id, remark, created_at, updated_at
FROM sys_dict_detail
WHERE dict_id=%s
ORDER BY sort ASC, created_at ASC
""",
(dict_id,),
)
rows = cursor.fetchall()
result = []
for row in rows:
result.append(
{
"id": row[0],
"value": row[1],
"sort": row[2],
"pid": row[3],
"dict_id": row[4],
"remark": row[5],
"created_at": row[6].isoformat() if row[6] else None,
"updated_at": row[7].isoformat() if row[7] else None,
}
)
return result
def db_create_dict_detail(
value: str, dict_id: str, sort: int = 0, pid: str = None, remark: str = None
):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sys_dict_detail (id, value, dict_id, sort, pid, remark, created_at)
VALUES (gen_random_uuid(), %s, %s, %s, %s, %s, now())
RETURNING id
""",
(value, dict_id, sort, pid, remark),
)
new_id = cursor.fetchone()[0]
return new_id
def db_update_dict_detail(
id: str, value: str = None, sort: int = None, pid: str = None, remark: str = None
):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sys_dict_detail
SET value=COALESCE(%s, value),
sort=COALESCE(%s, sort),
pid=COALESCE(%s, pid),
remark=COALESCE(%s, remark),
updated_at=now()
WHERE id=%s
""",
(value, sort, pid, remark, id),
)
return id
def db_delete_dict_detail(id: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM sys_dict_detail WHERE id=%s", (id,))
return id
def get_dict_detail_list_by_key(dict_key: str):
"""
通过字典 key 获取字典明细列表
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT
d.id,
d.value,
d.sort,
d.pid,
d.remark,
d.created_at
FROM sys_dict_detail d
JOIN sys_dict s ON d.dict_id = s.id
WHERE s.key = %s
ORDER BY d.sort ASC, d.created_at ASC
""",
(dict_key,),
)
rows = cursor.fetchall()
result = []
for row in rows:
result.append(
{
"id": row[0],
"value": row[1],
"sort": row[2],
"pid": row[3],
"remark": row[4],
"created_at": MyUtils.format_datetime(row[5]),
}
)
return result
def get_dept_ids_by_user_id(user_id: UUID) -> list:
# 第一步:通过 user_id 查找其所属的 dept_id
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT dept_id FROM users WHERE id = %s", (user_id,))
dept_id = cursor.fetchone()
if not dept_id:
raise ValueError(
f"User with id {user_id} not found or has no department."
)
dept_id = dept_id[0]
# 第二步:通过 dept_id 获取所有下级部门的 dept_id
cursor.execute(
"""
WITH RECURSIVE dept_hierarchy AS (
SELECT id FROM sys_dept WHERE id = %s
UNION
SELECT d.id FROM sys_dept d
INNER JOIN dept_hierarchy dh ON d.parent_id = dh.id
)
SELECT id FROM dept_hierarchy;
""",
(dept_id,),
)
dept_ids = [row[0] for row in cursor.fetchall()]
return dept_ids
def get_dept_ids_by_user_id(user_id: UUID) -> list:
# 第一步:通过 user_id 查找其所属的 dept_id
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT dept_id FROM users WHERE id = %s", (user_id,))
dept_id = cursor.fetchone()
if not dept_id:
raise ValueError(
f"User with id {user_id} not found or has no department."
)
dept_id = dept_id[0]
# 第二步:通过 dept_id 获取所有下级部门的 dept_id
cursor.execute(
"""
WITH RECURSIVE dept_hierarchy AS (
SELECT id FROM sys_dept WHERE id = %s
UNION
SELECT d.id FROM sys_dept d
INNER JOIN dept_hierarchy dh ON d.parent_id = dh.id
)
SELECT id FROM dept_hierarchy;
""",
(dept_id,),
)
dept_ids = [row[0] for row in cursor.fetchall()]
return dept_ids
def get_dept_id_by_user_id(user_id: UUID) -> list:
# 第一步:通过 user_id 查找其所属的 dept_id
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT dept_id FROM users WHERE id = %s", (user_id,))
dept_id = cursor.fetchone()
dept_id = dept_id[0]
return dept_id