Files
AILab/bbit_ai/app/db/postgres/iot.py
T
2026-02-04 13:58:18 +08:00

419 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from hashlib import sha256
from config.minIO import get_temp_url
from config.pgDb import pg_pool
from utils.MyUtils import format_datetime, is_valid_uuid
def get_device_list_db_page(
page: int,
page_size: int,
device_id=None,
name=None,
status=None,
is_superuser=None,
dept_id=None,
startTime=None,
endTime=None,
):
offset = (page - 1) * page_size
conditions = []
params = []
# ---- 设备 IDuuid----
if device_id:
conditions.append("d.id::text LIKE %s")
params.append(f"%{device_id}%")
# ---- 设备名模糊搜索 ----
if name:
conditions.append("d.name LIKE %s")
params.append(f"%{name}%")
# ---- 状态 ----
if status is not None:
conditions.append("d.is_active = %s")
params.append(status == 1)
# ---- 状态 ----
if is_superuser is not None:
conditions.append("d.is_superuser = %s")
params.append(is_superuser == 1)
# ---- 部门 ----
if dept_id and is_valid_uuid(dept_id):
conditions.append("d.dept_id = %s")
params.append(dept_id)
# ---- 时间过滤 ----
if startTime:
conditions.append("d.created_at >= %s")
params.append(startTime)
if endTime:
conditions.append("d.created_at <= %s")
params.append(endTime)
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ---- 统计总数 ----
count_sql = f"""
SELECT COUNT(*)
FROM iot_users d
{where_clause};
"""
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# ---- 分页查询 ----
list_sql = f"""
SELECT
d.id,
d.name,
d.remark,
d.is_active,
d.is_superuser,
d.dept_id,
d.type,
sd.name AS dept_name,
d.created_at
FROM iot_users d
LEFT JOIN sys_dept sd ON d.dept_id = sd.id
{where_clause}
ORDER BY d.created_at DESC
LIMIT %s OFFSET %s;
"""
cursor.execute(list_sql, params + [page_size, offset])
rows = cursor.fetchall()
result = []
for r in rows:
(
device_id,
name,
remark,
is_active,
is_superuser,
dept_id,
type,
dept_name,
created_at,
) = r
result.append(
{
"id": device_id,
"name": name,
"remark": remark,
"status": 1 if is_active else 0,
"is_superuser": 1 if is_superuser else 0,
"dept_id": dept_id,
"device_type": type,
"dept_name": dept_name,
"created_at": format_datetime(created_at),
}
)
return result, total
def insert_device(data: dict) -> str:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
name = data.get("name")
salt = name[-4:] if name else ""
password = data.get("password", "123456")
password_hash = sha256((salt + password).encode("utf-8")).hexdigest()
cursor.execute(
"""
INSERT INTO iot_users (name, password_hash, salt, remark, dept_id, is_active, is_superuser)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id;
""",
(
name,
password_hash,
salt,
data.get("remark"),
data.get("dept_id"),
bool(data.get("status", 1)),
bool(data.get("is_superuser", 0)),
),
)
device_id = cursor.fetchone()[0]
conn.commit()
return device_id
def update_device_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE iot_users
SET name=%s, remark=%s, is_active=%s, dept_id=%s, is_superuser=%s
WHERE id=%s;
""",
(
data.get("name"),
data.get("remark"),
bool(data.get("status", 1)),
data.get("dept_id"),
bool(data.get("is_superuser", 0)),
id,
),
)
conn.commit()
return cursor.rowcount
def patch_device_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
fields = []
params = []
mapping = {
"remark": "remark",
"status": "is_active",
"is_superuser": "is_superuser",
"dept_id": "dept_id",
}
for k, column in mapping.items():
if k in data:
value = (
bool(data[k]) if k in ["status", "is_superuser"] else data[k]
)
fields.append(f"{column} = %s")
params.append(value)
if fields:
sql = f"UPDATE iot_users SET {', '.join(fields)} WHERE id = %s"
params.append(id)
cursor.execute(sql, tuple(params))
conn.commit()
return cursor.rowcount
def delete_device_db(id: str) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM iot_users WHERE id=%s;", (id,))
conn.commit()
return cursor.rowcount
def delete_update_db(id: str) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"DELETE FROM iot_update WHERE id = %s;",
(id,),
)
conn.commit()
return cursor.rowcount
def get_update_list_db_page(
page: int,
page_size: int,
id=None,
code=None,
dept_id=None,
startTime=None,
endTime=None,
):
offset = (page - 1) * page_size
conditions = []
params = []
if id is not None:
conditions.append("u.id::text LIKE %s")
params.append(f"%{id}%")
# ---- 版本 / 升级代码 ----
if code is not None:
conditions.append("u.code = %s")
params.append(code)
# ---- 部门 ----
if dept_id and is_valid_uuid(dept_id):
conditions.append("u.dept_id = %s")
params.append(dept_id)
# ---- 时间过滤 ----
if startTime:
conditions.append("u.created_at >= %s")
params.append(startTime)
if endTime:
conditions.append("u.created_at <= %s")
params.append(endTime)
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ---- 总数 ----
count_sql = f"""
SELECT COUNT(*)
FROM iot_update u
{where_clause};
"""
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# ---- 列表 ----
list_sql = f"""
SELECT
u.id,
u.code,
u.dept_id,
sd.name AS dept_name,
u.remark,
u.oss,
u.size,
u.created_at
FROM iot_update u
LEFT JOIN sys_dept sd ON u.dept_id = sd.id
{where_clause}
ORDER BY u.created_at DESC
LIMIT %s OFFSET %s;
"""
cursor.execute(list_sql, params + [page_size, offset])
rows = cursor.fetchall()
result = []
for r in rows:
(
update_id,
code,
dept_id,
dept_name,
remark,
oss,
size,
created_at,
) = r
result.append(
{
"id": update_id,
"code": code,
"dept_id": dept_id,
"dept_name": dept_name,
"remark": remark,
"oss_url": oss,
"size": size,
"created_at": format_datetime(created_at),
}
)
return result, total
def insert_update(data: dict) -> str:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO iot_update
(code, dept_id, remark, oss, size)
VALUES
(%s, %s, %s, %s, %s)
RETURNING id;
""",
(
data.get("code"),
data.get("dept_id"),
data.get("remark"),
data.get("uploadId"),
data.get("size"),
),
)
update_id = cursor.fetchone()[0]
conn.commit()
return update_id
def get_update_package(device_id: str | None = None):
"""
根据设备 ID 获取所属组织最新版本的更新包信息
返回示例:
{
"version": 1001,
"url": "https://xxx",
"notes": "更新内容描述"
}
"""
if not device_id:
return None
sql_get_dept = """
SELECT dept_id
FROM iot_users
WHERE name = %s
LIMIT 1
"""
sql_get_package = """
SELECT code, oss, remark
FROM iot_update
WHERE dept_id = %s
ORDER BY code DESC
LIMIT 1
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# 1. 查询设备所属组织
cursor.execute(sql_get_dept, (device_id,))
row = cursor.fetchone()
if not row:
return None
dept_id = row[0]
# 2. 查询该组织最新更新包
cursor.execute(sql_get_package, (dept_id,))
row = cursor.fetchone()
if not row:
return None
version, oss_path, content = row
return {
"version": version,
"url": get_temp_url("iot-update", oss_path),
"notes": content,
}
def getMaxCodeByDeptId(dept_id: str | None = None) -> int:
"""
根据组织ID获取 iot_update_package 最大 code,并在结果上加 1
返回整数,如果没有记录则返回 1
"""
if not dept_id:
return 0 # dept_id 为空直接返回初始版本号 1
sql = """
SELECT MAX(code)
FROM iot_update
WHERE dept_id = %s
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(sql, (dept_id,))
row = cursor.fetchone()
max_code = row[0] if row and row[0] is not None else 0
return max_code