from hashlib import sha256 from config.minIO import get_temp_url from config.pgDb import pg_pool from utils.MyUtils import format_datetime, is_valid_uuid def get_device_list_db_page( page: int, page_size: int, device_id=None, name=None, status=None, is_superuser=None, dept_id=None, startTime=None, endTime=None, ): offset = (page - 1) * page_size conditions = [] params = [] # ---- 设备 ID(uuid)---- if device_id: conditions.append("d.id::text LIKE %s") params.append(f"%{device_id}%") # ---- 设备名模糊搜索 ---- if name: conditions.append("d.name LIKE %s") params.append(f"%{name}%") # ---- 状态 ---- if status is not None: conditions.append("d.is_active = %s") params.append(status == 1) # ---- 状态 ---- if is_superuser is not None: conditions.append("d.is_superuser = %s") params.append(is_superuser == 1) # ---- 部门 ---- if dept_id and is_valid_uuid(dept_id): conditions.append("d.dept_id = %s") params.append(dept_id) # ---- 时间过滤 ---- if startTime: conditions.append("d.created_at >= %s") params.append(startTime) if endTime: conditions.append("d.created_at <= %s") params.append(endTime) where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" with pg_pool.getConn() as conn: with conn.cursor() as cursor: # ---- 统计总数 ---- count_sql = f""" SELECT COUNT(*) FROM iot_users d {where_clause}; """ cursor.execute(count_sql, params) total = cursor.fetchone()[0] # ---- 分页查询 ---- list_sql = f""" SELECT d.id, d.name, d.remark, d.is_active, d.is_superuser, d.dept_id, d.type, sd.name AS dept_name, d.created_at FROM iot_users d LEFT JOIN sys_dept sd ON d.dept_id = sd.id {where_clause} ORDER BY d.created_at DESC LIMIT %s OFFSET %s; """ cursor.execute(list_sql, params + [page_size, offset]) rows = cursor.fetchall() result = [] for r in rows: ( device_id, name, remark, is_active, is_superuser, dept_id, type, dept_name, created_at, ) = r result.append( { "id": device_id, "name": name, "remark": remark, "status": 1 if is_active else 0, "is_superuser": 1 if is_superuser else 0, "dept_id": dept_id, "device_type": type, "dept_name": dept_name, "created_at": format_datetime(created_at), } ) return result, total def insert_device(data: dict) -> str: with pg_pool.getConn() as conn: with conn.cursor() as cursor: name = data.get("name") salt = name[-4:] if name else "" password = data.get("password", "123456") password_hash = sha256((salt + password).encode("utf-8")).hexdigest() cursor.execute( """ INSERT INTO iot_users (name, password_hash, salt, remark, dept_id, is_active, is_superuser) VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id; """, ( name, password_hash, salt, data.get("remark"), data.get("dept_id"), bool(data.get("status", 1)), bool(data.get("is_superuser", 0)), ), ) device_id = cursor.fetchone()[0] conn.commit() return device_id def update_device_db(id: str, data: dict) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ UPDATE iot_users SET remark=%s, is_active=%s, dept_id=%s, is_superuser=%s WHERE id=%s; """, ( data.get("remark"), bool(data.get("status", 1)), data.get("dept_id"), bool(data.get("is_superuser", 0)), id, ), ) conn.commit() return cursor.rowcount def patch_device_db(id: str, data: dict) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: fields = [] params = [] mapping = { "remark": "remark", "status": "is_active", "is_superuser": "is_superuser", "dept_id": "dept_id", } for k, column in mapping.items(): if k in data: value = ( bool(data[k]) if k in ["status", "is_superuser"] else data[k] ) fields.append(f"{column} = %s") params.append(value) if fields: sql = f"UPDATE iot_users SET {', '.join(fields)} WHERE id = %s" params.append(id) cursor.execute(sql, tuple(params)) conn.commit() return cursor.rowcount def delete_device_db(id: str) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("DELETE FROM iot_users WHERE id=%s;", (id,)) conn.commit() return cursor.rowcount def delete_update_db(id: str) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( "DELETE FROM iot_update WHERE id = %s;", (id,), ) conn.commit() return cursor.rowcount def get_update_list_db_page( page: int, page_size: int, id=None, code=None, dept_id=None, startTime=None, endTime=None, ): offset = (page - 1) * page_size conditions = [] params = [] if id is not None: conditions.append("u.id::text LIKE %s") params.append(f"%{id}%") # ---- 版本 / 升级代码 ---- if code is not None: conditions.append("u.code = %s") params.append(code) # ---- 部门 ---- if dept_id and is_valid_uuid(dept_id): conditions.append("u.dept_id = %s") params.append(dept_id) # ---- 时间过滤 ---- if startTime: conditions.append("u.created_at >= %s") params.append(startTime) if endTime: conditions.append("u.created_at <= %s") params.append(endTime) where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" with pg_pool.getConn() as conn: with conn.cursor() as cursor: # ---- 总数 ---- count_sql = f""" SELECT COUNT(*) FROM iot_update u {where_clause}; """ cursor.execute(count_sql, params) total = cursor.fetchone()[0] # ---- 列表 ---- list_sql = f""" SELECT u.id, u.code, u.dept_id, sd.name AS dept_name, u.remark, u.oss, u.size, u.created_at FROM iot_update u LEFT JOIN sys_dept sd ON u.dept_id = sd.id {where_clause} ORDER BY u.created_at DESC LIMIT %s OFFSET %s; """ cursor.execute(list_sql, params + [page_size, offset]) rows = cursor.fetchall() result = [] for r in rows: ( update_id, code, dept_id, dept_name, remark, oss, size, created_at, ) = r result.append( { "id": update_id, "code": code, "dept_id": dept_id, "dept_name": dept_name, "remark": remark, "oss_url": oss, "size": size, "created_at": format_datetime(created_at), } ) return result, total def insert_update(data: dict) -> str: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO iot_update (code, dept_id, remark, oss, size) VALUES (%s, %s, %s, %s, %s) RETURNING id; """, ( data.get("code"), data.get("dept_id"), data.get("remark"), data.get("uploadId"), data.get("size"), ), ) update_id = cursor.fetchone()[0] conn.commit() return update_id def get_update_package(device_id: str | None = None): """ 根据设备 ID 获取所属组织最新版本的更新包信息 返回示例: { "version": 1001, "url": "https://xxx", "notes": "更新内容描述" } """ if not device_id: return None sql_get_dept = """ SELECT dept_id FROM iot_users WHERE name = %s LIMIT 1 """ sql_get_package = """ SELECT code, oss, remark FROM iot_update WHERE dept_id = %s ORDER BY code DESC LIMIT 1 """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: # 1. 查询设备所属组织 cursor.execute(sql_get_dept, (device_id,)) row = cursor.fetchone() if not row: return None dept_id = row[0] # 2. 查询该组织最新更新包 cursor.execute(sql_get_package, (dept_id,)) row = cursor.fetchone() if not row: return None version, oss_path, content = row return { "version": version, "url": get_temp_url("iot-update", oss_path), "notes": content, } def getMaxCodeByDeptId(dept_id: str | None = None) -> int: """ 根据组织ID获取 iot_update_package 最大 code,并在结果上加 1 返回整数,如果没有记录则返回 1 """ if not dept_id: return 0 # dept_id 为空直接返回初始版本号 1 sql = """ SELECT MAX(code) FROM iot_update WHERE dept_id = %s """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute(sql, (dept_id,)) row = cursor.fetchone() max_code = row[0] if row and row[0] is not None else 0 return max_code