Files
2026-03-26 17:48:20 +08:00

292 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import uuid
from pathlib import Path
from uuid import UUID
import config.minIO as minIO
import db.postgres as pg
from agent.licenseImageAgent import get_license_response
from agent.vehicleImageAgent import get_vehicle_response
from ai.plate.my_plate import recognizer
from config.minIO import minio_client, get_temp_url
from config.yolo import YOLOSingleton
from db.postgres import (
get_dept_id_by_iot_user_name,
get_dept_ids_by_dept_id,
)
from db.postgres.sentinel import (
get_sentinel_record_by_id,
saveSentinelRecord,
)
from llm.ticketLLM import *
from llm.ticketLLMv2 import get_ticket_response_v2
from models.SentinelRecordRequest import SentinelRecordRequest
from routers.WS import ws_manager
from utils import validate_plate
def process_ticket_image(
version: int,
needBarcode: bool = False,
img_bytes=None,
file_name: str = None,
project_name: str = None,
user_id: UUID = None,
):
"""
处理票据图片的核心逻辑,供不同接口调用
"""
# 上传到 OSS,使用 UUID 做对象名
if img_bytes is None:
img_bytes = []
object_name = str(uuid.uuid4())
file_bytes = BytesIO(img_bytes)
bucket_name = "image-ticket"
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
minIO.push_file(bucket_name, object_name, file_bytes, img_bytes, "image/jpeg")
oss_url = minIO.get_temp_url(bucket_name, object_name)
# 调用分析方法获取 JSON
if version == 1:
json_data = get_ticket_response(oss_url)
elif version == 2:
json_data = get_ticket_response_v2(oss_url, needBarcode)
else:
json_data = get_ticket_response(oss_url)
# 获取图片分辨率和大小
img = Image.open(BytesIO(img_bytes))
resolution = f"{img.width}x{img.height}"
size_kb = round(len(img_bytes) / 1024, 2)
# 插入数据库
pg.insert_ticket_image(
created_by=user_id,
file_name=file_name,
resolution=resolution,
size=size_kb,
name=project_name if project_name else object_name[:8],
dead_pupa_count=json_data.get("dead_pupa_count") if version == 2 else 0,
moisture_content=json_data.get("moisture_content") if version == 2 else 0,
cocoon_weight=json_data.get("cocoon_weight"),
defective_pupa_count=json_data.get("defective_pupa_count"),
fresh_shell_weight=json_data.get("fresh_shell_weight"),
sample_count=json_data.get("sample_count"),
barcode=json_data.get("barcode"),
oss=object_name,
net_weight_total=json_data.get("net_weight_total"),
evaluator=json_data.get("evaluator"),
reviewer=json_data.get("reviewer"),
)
return json_data
def process_license_image(
img_bytes=None,
file_name: str = None,
project_name: str = None,
user_id: UUID = None,
):
# 上传到 OSS,使用 UUID 做对象名
if img_bytes is None:
img_bytes = []
object_name = str(uuid.uuid4())
file_bytes = BytesIO(img_bytes)
bucket_name = "image-license"
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
minIO.push_file(bucket_name, object_name, file_bytes, img_bytes, "image/jpeg")
oss_url = minIO.get_temp_url(bucket_name, object_name)
# 调用分析方法获取 JSON
json_data = get_license_response(oss_url)
# 获取图片分辨率和大小
img = Image.open(BytesIO(img_bytes))
resolution = f"{img.width}x{img.height}"
size_kb = round(len(img_bytes) / 1024, 2)
# 插入数据库
pg.insert_license_image(
created_by=user_id,
file_name=file_name,
resolution=resolution,
size=size_kb,
name=project_name if project_name else object_name[:8],
oss=object_name,
type=json_data.get("type"),
content=json_data.get("content"),
)
return json_data
def process_silkworm_cocoon_image(
img_bytes=None,
file_name: str = None,
project_name: str = None,
user_id: UUID = None,
):
# 上传到 OSS,使用 UUID 做对象名
if img_bytes is None:
img_bytes = []
pre_object_name = str(uuid.uuid4())
after_object_name = str(uuid.uuid4())
file_bytes = BytesIO(img_bytes)
bucket_name = "image-sca"
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
minIO.push_file(
bucket_name, "raw/" + pre_object_name, file_bytes, img_bytes, "image/jpeg"
)
# YOLO检测
img_bytes_out, results_json = YOLOSingleton.detect(img_bytes)
speed_json = results_json.get("speed_ms")
file_bytes_out = BytesIO(img_bytes_out)
minIO.push_file(
bucket_name,
"ai/" + after_object_name,
file_bytes_out,
img_bytes_out,
"image/jpeg",
)
# 获取图片分辨率和大小
img = Image.open(BytesIO(img_bytes))
resolution = f"{img.width}x{img.height}"
size_kb = round(len(img_bytes) / 1024, 2)
# 插入数据库
pg.insert_sca_image(
file_name=file_name,
resolution=resolution,
size=size_kb,
cocoon_count=results_json.get("total_objects"),
max_confidence=results_json.get("max_confidence"),
min_confidence=results_json.get("min_confidence"),
average_confidence=results_json.get("avg_confidence"),
other_info=results_json.get("class_counts"),
preprocess_time_ms=speed_json.get("preprocess"),
inference_time_ms=speed_json.get("inference"),
postprocess_time_ms=speed_json.get("postprocess"),
name=project_name if project_name else pre_object_name[:8],
image_pre=pre_object_name,
image_after=after_object_name,
created_by=user_id,
)
return {
"resolution": resolution,
"size": size_kb / 1024,
"cocoon_count": results_json.get("total_objects"),
"max_confidence": results_json.get("max_confidence"),
"min_confidence": results_json.get("min_confidence"),
"average_confidence": results_json.get("avg_confidence"),
"preprocess_time_ms": speed_json.get("preprocess"),
"inference_time_ms": speed_json.get("inference"),
"postprocess_time_ms": speed_json.get("postprocess"),
"details": results_json.get("class_counts"),
"imageUrl": get_temp_url(bucket_name, "ai/" + after_object_name),
}
# 处理车牌照片
async def process_all_vehicle_animal_image(
data: SentinelRecordRequest,
):
oss_url = minIO.get_temp_url(
"sentinel", "vehicle_image_side/" + data.vehicleImageSide
)
# LLM得到车身信息
analysis_result = await get_vehicle_response(oss_url)
livestock_type = analysis_result.get("livestock_type", "")
remark = analysis_result.get("remark", "")
have_animal = analysis_result.get("have_animal", False)
if not have_animal:
minIO.delete_file("sentinel", "vehicle_image_side/" + data.vehicleImageSide)
minIO.delete_file("sentinel", "vehicle_image_front/" + data.vehicleImageFront)
else:
# 通过设备id获得组织id
dept_id = get_dept_id_by_iot_user_name(data.DeviceId)
# 处理汽车正面照--------------------------
# 从OSS下载
oss_url = minIO.get_temp_url(
"sentinel", "vehicle_image_front/" + data.vehicleImageFront
)
# 获取系统临时目录(自动兼容 Windows / Linux
tmp_dir = Path(tempfile.gettempdir())
tmp_dir.mkdir(parents=True, exist_ok=True)
tmp_path = tmp_dir / data.vehicleImageFront
# 下载图片到 tmp_path
response = requests.get(oss_url, stream=True)
if response.status_code == 200:
with open(tmp_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
else:
raise Exception(f"下载失败: {oss_url}, status_code={response.status_code}")
# 调用识别
results = recognizer.analyze_image(str(tmp_path))
# 车牌号一次识别
license_plate = results[0].plate_no if results else ""
# 车牌号二次校准
license_plate = validate_plate(license_plate)
license_plate_color_str = results[0].plate_color if results else ""
color_map = {
"蓝色": 0,
"黄色": 1,
"绿色": 2,
"黑色": 3,
"白色": 4,
}
license_plate_color = color_map.get(license_plate_color_str, 0)
license_plate_image = data.vehicleImageFront
# 保存到数据库
saveSentinelRecord(
data.Id,
data.VehicleType,
data.vehicleImageSide,
livestock_type,
remark,
dept_id,
license_plate,
license_plate_image,
license_plate_color,
)
# 识别完成后删除临时文件
os.remove(tmp_path)
# 可以通知的部门ids
available_departments = get_dept_ids_by_dept_id(dept_id)
# 通知控制界面
await ws_manager.noticeSentinel(
{
"content": f"车辆即将进入关卡,请准备检查",
"license_plate": license_plate,
"type": "vehicle_alert",
},
available_departments,
)
# 通知大屏界面
await ws_manager.noticeSentinelMonitorStatus(
{
"content": get_sentinel_record_by_id(data.Id),
"type": "vehicle_alert",
},
available_departments,
)