from hashlib import sha256 from config.pgDb import pg_pool from utils.MyUtils import format_datetime, is_valid_uuid def get_device_list_db_page( page: int, page_size: int, device_id=None, name=None, status=None, is_superuser=None, dept_id=None, startTime=None, endTime=None, ): offset = (page - 1) * page_size conditions = [] params = [] # ---- 设备 ID(uuid)---- if device_id: conditions.append("d.id::text LIKE %s") params.append(f"%{device_id}%") # ---- 设备名模糊搜索 ---- if name: conditions.append("d.name LIKE %s") params.append(f"%{name}%") # ---- 状态 ---- if status is not None: conditions.append("d.is_active = %s") params.append(status == 1) # ---- 状态 ---- if is_superuser is not None: conditions.append("d.is_superuser = %s") params.append(is_superuser == 1) # ---- 部门 ---- if dept_id and is_valid_uuid(dept_id): conditions.append("d.dept_id = %s") params.append(dept_id) # ---- 时间过滤 ---- if startTime: conditions.append("d.created_at >= %s") params.append(startTime) if endTime: conditions.append("d.created_at <= %s") params.append(endTime) where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" with pg_pool.getConn() as conn: with conn.cursor() as cursor: # ---- 统计总数 ---- count_sql = f""" SELECT COUNT(*) FROM iot_users d {where_clause}; """ cursor.execute(count_sql, params) total = cursor.fetchone()[0] # ---- 分页查询 ---- list_sql = f""" SELECT d.id, d.name, d.remark, d.is_active, d.is_superuser, d.dept_id, sd.name AS dept_name, d.created_at FROM iot_users d LEFT JOIN sys_dept sd ON d.dept_id = sd.id {where_clause} ORDER BY d.created_at DESC LIMIT %s OFFSET %s; """ cursor.execute(list_sql, params + [page_size, offset]) rows = cursor.fetchall() result = [] for r in rows: ( device_id, name, remark, is_active, is_superuser, dept_id, dept_name, created_at, ) = r result.append( { "id": device_id, "name": name, "remark": remark, "status": 1 if is_active else 0, "is_superuser": 1 if is_superuser else 0, "dept_id": dept_id, "dept_name": dept_name, "created_at": format_datetime(created_at), } ) return result, total def insert_device(data: dict) -> str: with pg_pool.getConn() as conn: with conn.cursor() as cursor: name = data.get("name") salt = name[-4:] if name else "" password = data.get("password", "123456") password_hash = sha256((salt + password).encode("utf-8")).hexdigest() cursor.execute( """ INSERT INTO iot_users (name, password_hash, salt, remark, dept_id, is_active, is_superuser) VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id; """, ( name, password_hash, salt, data.get("remark"), data.get("dept_id"), bool(data.get("status", 1)), bool(data.get("is_superuser", 0)), ), ) device_id = cursor.fetchone()[0] conn.commit() return device_id def update_device_db(id: str, data: dict) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ UPDATE iot_users SET remark=%s, is_active=%s, dept_id=%s, is_superuser=%s WHERE id=%s; """, ( data.get("remark"), bool(data.get("status", 1)), data.get("dept_id"), bool(data.get("is_superuser", 0)), id, ), ) conn.commit() return cursor.rowcount def patch_device_db(id: str, data: dict) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: fields = [] params = [] mapping = { "remark": "remark", "status": "is_active", "is_superuser": "is_superuser", "dept_id": "dept_id", } for k, column in mapping.items(): if k in data: value = ( bool(data[k]) if k in ["status", "is_superuser"] else data[k] ) fields.append(f"{column} = %s") params.append(value) if fields: sql = f"UPDATE iot_users SET {', '.join(fields)} WHERE id = %s" params.append(id) cursor.execute(sql, tuple(params)) conn.commit() return cursor.rowcount def delete_device_db(id: str) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("DELETE FROM iot_users WHERE id=%s;", (id,)) conn.commit() return cursor.rowcount