262 lines
8.4 KiB
Python
262 lines
8.4 KiB
Python
from config.minIO import get_temp_url_dict
|
|
from config.pgDb import pg_pool
|
|
from utils.MyUtils import format_datetime
|
|
|
|
|
|
def get_sentinel_record_list_db_page(
|
|
page: int,
|
|
page_size: int,
|
|
record_id=None,
|
|
license_plate=None,
|
|
vehicle_type=None,
|
|
is_inspected=None,
|
|
livestock_source=None,
|
|
livestock_type=None,
|
|
dept_ids=None,
|
|
start_time=None,
|
|
end_time=None,
|
|
):
|
|
offset = (page - 1) * page_size
|
|
|
|
conditions = []
|
|
params = []
|
|
|
|
# ---- 记录 ID ----
|
|
if record_id:
|
|
conditions.append("r.id::text LIKE %s")
|
|
params.append(f"%{record_id}%")
|
|
|
|
# ---- 车牌号 ----
|
|
if license_plate:
|
|
conditions.append("r.license_plate LIKE %s")
|
|
params.append(f"%{license_plate}%")
|
|
|
|
# ---- 车型 ----
|
|
if vehicle_type:
|
|
conditions.append("r.vehicle_type LIKE %s")
|
|
params.append(f"%{vehicle_type}%")
|
|
|
|
# ---- 是否检查 ----
|
|
if is_inspected is not None:
|
|
conditions.append("r.is_inspected = %s")
|
|
params.append(bool(is_inspected))
|
|
|
|
# ---- 来源 ----
|
|
if livestock_source is not None:
|
|
conditions.append("r.livestock_source LIKE %s")
|
|
params.append(f"%{livestock_source}%")
|
|
|
|
# ---- 种类 ----
|
|
if livestock_type is not None:
|
|
conditions.append("r.livestock_type LIKE %s")
|
|
params.append(livestock_type)
|
|
|
|
# ---- 部门 ----
|
|
if dept_ids:
|
|
conditions.append("r.dept_id = ANY(%s)")
|
|
params.append(dept_ids)
|
|
|
|
# ---- 时间过滤 ----
|
|
if start_time:
|
|
conditions.append("r.created_at >= %s")
|
|
params.append(start_time)
|
|
|
|
if end_time:
|
|
conditions.append("r.created_at <= %s")
|
|
params.append(end_time)
|
|
|
|
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
|
|
|
|
with pg_pool.getConn() as conn:
|
|
with conn.cursor() as cursor:
|
|
|
|
# ---- 统计总数 ----
|
|
count_sql = f"""
|
|
SELECT COUNT(*)
|
|
FROM sentinel_records r
|
|
{where_clause};
|
|
"""
|
|
cursor.execute(count_sql, params)
|
|
total = cursor.fetchone()[0]
|
|
|
|
# ---- 分页查询 ----
|
|
list_sql = f"""
|
|
SELECT
|
|
r.id,
|
|
r.license_plate,
|
|
r.vehicle_type,
|
|
r.license_plate_image,
|
|
r.vehicle_image,
|
|
r.livestock_type,
|
|
r.livestock_source,
|
|
r.is_inspected,
|
|
r.dept_id,
|
|
sd.name AS dept_name,
|
|
r.created_at,
|
|
r.updated_at,
|
|
r.remark
|
|
FROM sentinel_records r
|
|
LEFT JOIN sys_dept sd ON r.dept_id = sd.id
|
|
{where_clause}
|
|
ORDER BY r.created_at DESC
|
|
LIMIT %s OFFSET %s;
|
|
"""
|
|
|
|
cursor.execute(list_sql, params + [page_size, offset])
|
|
rows = cursor.fetchall()
|
|
|
|
result = []
|
|
for r in rows:
|
|
(
|
|
record_id,
|
|
license_plate,
|
|
vehicle_type,
|
|
license_plate_image,
|
|
vehicle_image,
|
|
livestock_type,
|
|
livestock_source,
|
|
is_inspected,
|
|
dept_id,
|
|
dept_name,
|
|
created_at,
|
|
updated_at,
|
|
remark,
|
|
) = r
|
|
|
|
result.append(
|
|
{
|
|
"id": record_id,
|
|
"license_plate": license_plate,
|
|
"vehicle_type": vehicle_type,
|
|
"license_plate_image": get_temp_url_dict(
|
|
"sentinel", "license_plate", license_plate_image
|
|
),
|
|
"vehicle_image": get_temp_url_dict(
|
|
"sentinel", "vehicle_image", vehicle_image
|
|
),
|
|
"livestock_type": livestock_type,
|
|
"livestock_source": livestock_source,
|
|
"is_inspected": 1 if is_inspected else 0,
|
|
"dept_id": dept_id,
|
|
"dept_name": dept_name,
|
|
"created_at": format_datetime(created_at),
|
|
"updated_at": format_datetime(updated_at),
|
|
"remark": remark,
|
|
}
|
|
)
|
|
|
|
return result, total
|
|
|
|
|
|
def insert_sentinel_record(data: dict, dept_id) -> str:
|
|
with pg_pool.getConn() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO sentinel_records (
|
|
license_plate,
|
|
vehicle_type,
|
|
license_plate_image,
|
|
vehicle_image,
|
|
livestock_type,
|
|
livestock_source,
|
|
is_inspected,
|
|
dept_id,
|
|
remark,
|
|
created_by
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id;
|
|
""",
|
|
(
|
|
data.get("license_plate"),
|
|
data.get("vehicle_type"),
|
|
data.get("license_plate_image"),
|
|
data.get("vehicle_image"),
|
|
data.get("livestock_type"),
|
|
data.get("livestock_source"),
|
|
bool(data.get("is_inspected", False)),
|
|
dept_id,
|
|
data.get("remark"),
|
|
data.get("created_by"),
|
|
),
|
|
)
|
|
record_id = cursor.fetchone()[0]
|
|
conn.commit()
|
|
return record_id
|
|
|
|
|
|
def update_sentinel_record_db(id: str, data: dict) -> int:
|
|
with pg_pool.getConn() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
UPDATE sentinel_records
|
|
SET
|
|
license_plate = %s,
|
|
vehicle_type = %s,
|
|
license_plate_image = %s,
|
|
vehicle_image = %s,
|
|
livestock_type = %s,
|
|
livestock_source = %s,
|
|
is_inspected = %s,
|
|
remark = %s,
|
|
updated_at = now()
|
|
WHERE id = %s;
|
|
""",
|
|
(
|
|
data.get("license_plate"),
|
|
data.get("vehicle_type"),
|
|
data.get("license_plate_image"),
|
|
data.get("vehicle_image"),
|
|
data.get("livestock_type"),
|
|
data.get("livestock_source"),
|
|
bool(data.get("is_inspected", False)),
|
|
data.get("remark"),
|
|
id,
|
|
),
|
|
)
|
|
conn.commit()
|
|
return cursor.rowcount
|
|
|
|
|
|
def patch_sentinel_record_db(id: str, data: dict) -> int:
|
|
with pg_pool.getConn() as conn:
|
|
with conn.cursor() as cursor:
|
|
fields = []
|
|
params = []
|
|
mapping = {
|
|
"license_plate": "license_plate",
|
|
"vehicle_type": "vehicle_type",
|
|
"license_plate_image": "license_plate_image",
|
|
"vehicle_image": "vehicle_image",
|
|
"livestock_type": "livestock_type",
|
|
"livestock_source": "livestock_source",
|
|
"is_inspected": "is_inspected",
|
|
"remark": "remark",
|
|
}
|
|
|
|
for k, column in mapping.items():
|
|
if k in data:
|
|
value = data[k]
|
|
# 如果字段是 "is_inspected",将其转换为布尔类型
|
|
if k == "is_inspected":
|
|
value = bool(value) # 转换为布尔值
|
|
fields.append(f"{column} = %s")
|
|
params.append(value)
|
|
|
|
if fields:
|
|
sql = f"UPDATE sentinel_records SET {', '.join(fields)} WHERE id = %s"
|
|
params.append(id)
|
|
cursor.execute(sql, tuple(params))
|
|
conn.commit()
|
|
return cursor.rowcount
|
|
|
|
|
|
def delete_sentinel_record_db(id: str) -> int:
|
|
with pg_pool.getConn() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("DELETE FROM sentinel_records WHERE id=%s;", (id,))
|
|
conn.commit()
|
|
return cursor.rowcount
|