from config.minIO import get_temp_url_dict from config.pgDb import pg_pool from models.SentinelRecordRequest import SentinelRecordRequest from utils.MyUtils import format_datetime def get_sentinel_record_list_db_page( page: int, page_size: int, record_id=None, license_plate=None, vehicle_type=None, is_inspected=None, livestock_source=None, livestock_type=None, dept_ids=None, start_time=None, end_time=None, ): offset = (page - 1) * page_size conditions = [] params = [] # ---- 记录 ID ---- if record_id: conditions.append("r.id::text LIKE %s") params.append(f"%{record_id}%") # ---- 车牌号 ---- if license_plate: conditions.append("r.license_plate LIKE %s") params.append(f"%{license_plate}%") # ---- 车型 ---- if vehicle_type: conditions.append("r.vehicle_type LIKE %s") params.append(f"%{vehicle_type}%") # ---- 是否检查 ---- if is_inspected is not None: conditions.append("r.is_inspected = %s") params.append(bool(is_inspected)) # ---- 来源 ---- if livestock_source is not None: conditions.append("r.livestock_source LIKE %s") params.append(f"%{livestock_source}%") # ---- 种类 ---- if livestock_type is not None: conditions.append("r.livestock_type LIKE %s") params.append(livestock_type) # ---- 部门 ---- if dept_ids: conditions.append("r.dept_id = ANY(%s)") params.append(dept_ids) # ---- 时间过滤 ---- if start_time: conditions.append("r.created_at >= %s") params.append(start_time) if end_time: conditions.append("r.created_at <= %s") params.append(end_time) where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" with pg_pool.getConn() as conn: with conn.cursor() as cursor: # ---- 统计总数 ---- count_sql = f""" SELECT COUNT(*) FROM sentinel_records r {where_clause}; """ cursor.execute(count_sql, params) total = cursor.fetchone()[0] # ---- 分页查询 ---- list_sql = f""" SELECT r.id, r.license_plate, r.vehicle_type, r.license_plate_image, r.vehicle_image, r.livestock_type, r.livestock_source, r.is_inspected, r.dept_id, sd.name AS dept_name, r.created_at, r.updated_at, r.remark FROM sentinel_records r LEFT JOIN sys_dept sd ON r.dept_id = sd.id {where_clause} ORDER BY r.created_at DESC LIMIT %s OFFSET %s; """ cursor.execute(list_sql, params + [page_size, offset]) rows = cursor.fetchall() result = [] for r in rows: ( record_id, license_plate, vehicle_type, license_plate_image, vehicle_image, livestock_type, livestock_source, is_inspected, dept_id, dept_name, created_at, updated_at, remark, ) = r result.append( { "id": record_id, "license_plate": license_plate, "vehicle_type": vehicle_type, "license_plate_image": get_temp_url_dict( "sentinel", "license_plate", license_plate_image ), "vehicle_image": get_temp_url_dict( "sentinel", "vehicle_image", vehicle_image ), "livestock_type": livestock_type, "livestock_source": livestock_source, "is_inspected": 1 if is_inspected else 0, "dept_id": dept_id, "dept_name": dept_name, "created_at": format_datetime(created_at), "updated_at": format_datetime(updated_at), "remark": remark, } ) return result, total def insert_sentinel_record(data: dict, dept_id) -> str: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO sentinel_records ( license_plate, vehicle_type, license_plate_image, vehicle_image, livestock_type, livestock_source, is_inspected, dept_id, remark, created_by ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id; """, ( data.get("license_plate"), data.get("vehicle_type"), data.get("license_plate_image"), data.get("vehicle_image"), data.get("livestock_type"), data.get("livestock_source"), bool(data.get("is_inspected", False)), dept_id, data.get("remark"), data.get("created_by"), ), ) record_id = cursor.fetchone()[0] conn.commit() return record_id def update_sentinel_record_db(id: str, data: dict) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ UPDATE sentinel_records SET license_plate = %s, vehicle_type = %s, license_plate_image = %s, vehicle_image = %s, livestock_type = %s, livestock_source = %s, is_inspected = %s, remark = %s, updated_at = now() WHERE id = %s; """, ( data.get("license_plate"), data.get("vehicle_type"), data.get("license_plate_image"), data.get("vehicle_image"), data.get("livestock_type"), data.get("livestock_source"), bool(data.get("is_inspected", False)), data.get("remark"), id, ), ) conn.commit() return cursor.rowcount def patch_sentinel_record_db(id: str, data: dict) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: fields = [] params = [] mapping = { "license_plate": "license_plate", "vehicle_type": "vehicle_type", "license_plate_image": "license_plate_image", "vehicle_image": "vehicle_image", "livestock_type": "livestock_type", "livestock_source": "livestock_source", "is_inspected": "is_inspected", "remark": "remark", } for k, column in mapping.items(): if k in data: value = data[k] # 如果字段是 "is_inspected",将其转换为布尔类型 if k == "is_inspected": value = bool(value) # 转换为布尔值 fields.append(f"{column} = %s") params.append(value) if fields: sql = f"UPDATE sentinel_records SET {', '.join(fields)} WHERE id = %s" params.append(id) cursor.execute(sql, tuple(params)) conn.commit() return cursor.rowcount def delete_sentinel_record_db(id: str) -> int: with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute("DELETE FROM sentinel_records WHERE id=%s;", (id,)) conn.commit() return cursor.rowcount def saveSentinelRecord(data: SentinelRecordRequest) -> str: sql = """ INSERT INTO sentinel_records ( license_plate, license_plate_image, vehicle_type, vehicle_image ) VALUES (%s, %s, %s, %s) RETURNING id; """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( sql, ( data.LicensePlate, data.LicensePlateImage, data.VehicleType, data.VehicleImage, ), ) new_id = cursor.fetchone()[0] conn.commit() return str(new_id) def update_sentinel_record( id: str, livestock_type: str, remark: str, dept_id: str ) -> bool: """ 根据 id 更新 sentinel_records 表中的 livestock_type 和 dept_id """ sql = """ UPDATE sentinel_records SET livestock_type = %s, remark = %s, dept_id = %s, updated_at = now() WHERE id = %s RETURNING id; """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute(sql, (livestock_type, remark, dept_id, id)) record = cursor.fetchone() conn.commit() return record is not None