Files
AILab/bbit_ai/app/db/postgres/sentinel.py
T
2026-02-04 13:58:18 +08:00

386 lines
12 KiB
Python

from config.minIO import get_temp_url_dict
from config.pgDb import pg_pool
from models.SentinelRecordRequest import SentinelRecordRequest
from utils.MyUtils import format_datetime
def get_sentinel_record_list_db_page(
page: int,
page_size: int,
record_id=None,
license_plate=None,
vehicle_type=None,
is_inspected=None,
livestock_source=None,
livestock_type=None,
dept_ids=None,
start_time=None,
end_time=None,
):
offset = (page - 1) * page_size
conditions = []
params = []
# ---- 记录 ID ----
if record_id:
conditions.append("r.id::text LIKE %s")
params.append(f"%{record_id}%")
# ---- 车牌号 ----
if license_plate:
conditions.append("r.license_plate LIKE %s")
params.append(f"%{license_plate}%")
# ---- 车型 ----
if vehicle_type:
conditions.append("r.vehicle_type LIKE %s")
params.append(f"%{vehicle_type}%")
# ---- 是否检查 ----
if is_inspected is not None:
conditions.append("r.is_inspected = %s")
params.append(bool(is_inspected))
# ---- 来源 ----
if livestock_source is not None:
conditions.append("r.livestock_source LIKE %s")
params.append(f"%{livestock_source}%")
# ---- 种类 ----
if livestock_type is not None:
conditions.append("r.livestock_type LIKE %s")
params.append(livestock_type)
# ---- 部门 ----
if dept_ids:
conditions.append("r.dept_id = ANY(%s)")
params.append(dept_ids)
# ---- 时间过滤 ----
if start_time:
conditions.append("r.created_at >= %s")
params.append(start_time)
if end_time:
conditions.append("r.created_at <= %s")
params.append(end_time)
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# ---- 统计总数 ----
count_sql = f"""
SELECT COUNT(*)
FROM sentinel_records r
{where_clause};
"""
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# ---- 分页查询 ----
list_sql = f"""
SELECT
r.id,
r.license_plate,
r.vehicle_type,
r.license_plate_image,
r.vehicle_image,
r.livestock_type,
r.livestock_source,
r.is_inspected,
r.dept_id,
sd.name AS dept_name,
r.created_at,
r.updated_at,
r.remark
FROM sentinel_records r
LEFT JOIN sys_dept sd ON r.dept_id = sd.id
{where_clause}
ORDER BY r.created_at DESC
LIMIT %s OFFSET %s;
"""
cursor.execute(list_sql, params + [page_size, offset])
rows = cursor.fetchall()
result = []
for r in rows:
(
record_id,
license_plate,
vehicle_type,
license_plate_image,
vehicle_image,
livestock_type,
livestock_source,
is_inspected,
dept_id,
dept_name,
created_at,
updated_at,
remark,
) = r
result.append(
{
"id": record_id,
"license_plate": license_plate,
"vehicle_type": vehicle_type,
"license_plate_image": get_temp_url_dict(
"sentinel", "license_plate", license_plate_image
),
"vehicle_image": get_temp_url_dict(
"sentinel", "vehicle_image", vehicle_image
),
"livestock_type": livestock_type,
"livestock_source": livestock_source,
"is_inspected": 1 if is_inspected else 0,
"dept_id": dept_id,
"dept_name": dept_name,
"created_at": format_datetime(created_at),
"updated_at": format_datetime(updated_at),
"remark": remark,
}
)
return result, total
def get_sentinel_record_by_id(record_id):
if not record_id:
return None
sql = """
SELECT
r.id,
r.license_plate,
r.vehicle_type,
r.license_plate_image,
r.vehicle_image,
r.livestock_type,
r.livestock_source,
r.is_inspected,
r.dept_id,
sd.name AS dept_name,
r.created_at,
r.updated_at,
r.remark
FROM sentinel_records r
LEFT JOIN sys_dept sd ON r.dept_id = sd.id
WHERE r.id = %s;
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(sql, [record_id])
row = cursor.fetchone()
if not row:
return None
(
record_id,
license_plate,
vehicle_type,
license_plate_image,
vehicle_image,
livestock_type,
livestock_source,
is_inspected,
dept_id,
dept_name,
created_at,
updated_at,
remark,
) = row
result = {
"id": str(record_id),
"license_plate": license_plate,
"vehicle_type": vehicle_type,
"license_plate_image": get_temp_url_dict(
"sentinel", "license_plate", license_plate_image
),
"vehicle_image": get_temp_url_dict(
"sentinel", "vehicle_image", vehicle_image
),
"livestock_type": livestock_type,
"livestock_source": livestock_source,
"is_inspected": 1 if is_inspected else 0,
"dept_id": str(dept_id),
"dept_name": dept_name,
"created_at": format_datetime(created_at),
"updated_at": format_datetime(updated_at),
"remark": remark,
}
return result
def insert_sentinel_record(data: dict, dept_id) -> str:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO sentinel_records (
license_plate,
vehicle_type,
license_plate_image,
vehicle_image,
livestock_type,
livestock_source,
is_inspected,
dept_id,
remark,
created_by
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id;
""",
(
data.get("license_plate"),
data.get("vehicle_type"),
data.get("license_plate_image"),
data.get("vehicle_image"),
data.get("livestock_type"),
data.get("livestock_source"),
bool(data.get("is_inspected", False)),
dept_id,
data.get("remark"),
data.get("created_by"),
),
)
record_id = cursor.fetchone()[0]
conn.commit()
return record_id
def update_sentinel_record_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE sentinel_records
SET
license_plate = %s,
vehicle_type = %s,
license_plate_image = %s,
vehicle_image = %s,
livestock_type = %s,
livestock_source = %s,
is_inspected = %s,
remark = %s,
updated_at = now()
WHERE id = %s;
""",
(
data.get("license_plate"),
data.get("vehicle_type"),
data.get("license_plate_image"),
data.get("vehicle_image"),
data.get("livestock_type"),
data.get("livestock_source"),
bool(data.get("is_inspected", False)),
data.get("remark"),
id,
),
)
conn.commit()
return cursor.rowcount
def patch_sentinel_record_db(id: str, data: dict) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
fields = []
params = []
mapping = {
"license_plate": "license_plate",
"vehicle_type": "vehicle_type",
"license_plate_image": "license_plate_image",
"vehicle_image": "vehicle_image",
"livestock_type": "livestock_type",
"livestock_source": "livestock_source",
"is_inspected": "is_inspected",
"remark": "remark",
}
for k, column in mapping.items():
if k in data:
value = data[k]
# 如果字段是 "is_inspected",将其转换为布尔类型
if k == "is_inspected":
value = bool(value) # 转换为布尔值
fields.append(f"{column} = %s")
params.append(value)
if fields:
sql = f"UPDATE sentinel_records SET {', '.join(fields)} WHERE id = %s"
params.append(id)
cursor.execute(sql, tuple(params))
conn.commit()
return cursor.rowcount
def delete_sentinel_record_db(id: str) -> int:
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM sentinel_records WHERE id=%s;", (id,))
conn.commit()
return cursor.rowcount
def saveSentinelRecord(data: SentinelRecordRequest) -> str:
sql = """
INSERT INTO sentinel_records (
license_plate,
license_plate_image,
vehicle_type,
vehicle_image
)
VALUES (%s, %s, %s, %s)
RETURNING id;
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
sql,
(
data.LicensePlate,
data.LicensePlateImage,
data.VehicleType,
data.VehicleImage,
),
)
new_id = cursor.fetchone()[0]
conn.commit()
return str(new_id)
def update_sentinel_record(
id: str, livestock_type: str, remark: str, dept_id: str
) -> bool:
"""
根据 id 更新 sentinel_records 表中的 livestock_type 和 dept_id
"""
sql = """
UPDATE sentinel_records
SET livestock_type = %s,
remark = %s,
dept_id = %s,
updated_at = now()
WHERE id = %s
RETURNING id;
"""
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(sql, (livestock_type, remark, dept_id, id))
record = cursor.fetchone()
conn.commit()
return record is not None