import uuid from pathlib import Path from uuid import UUID import config.minIO as minIO import db.postgres as pg from agent.licenseImageAgent import get_license_response from agent.vehicleImageAgent import get_vehicle_response from ai.plate.my_plate import recognizer from config.minIO import minio_client, get_temp_url from config.yolo import YOLOSingleton from db.postgres import ( get_dept_id_by_iot_user_name, get_dept_ids_by_dept_id, ) from db.postgres.sentinel import ( get_sentinel_record_by_id, saveSentinelRecord, ) from llm.ticketLLM import * from llm.ticketLLMv2 import get_ticket_response_v2 from models.SentinelRecordRequest import SentinelRecordRequest from routers.WS import ws_manager from utils import validate_plate def process_ticket_image( version: int, needBarcode: bool = False, img_bytes=None, file_name: str = None, project_name: str = None, user_id: UUID = None, ): """ 处理票据图片的核心逻辑,供不同接口调用 """ # 上传到 OSS,使用 UUID 做对象名 if img_bytes is None: img_bytes = [] object_name = str(uuid.uuid4()) file_bytes = BytesIO(img_bytes) bucket_name = "image-ticket" if not minio_client.bucket_exists(bucket_name): minio_client.make_bucket(bucket_name) minIO.push_file(bucket_name, object_name, file_bytes, img_bytes, "image/jpeg") oss_url = minIO.get_temp_url(bucket_name, object_name) # 调用分析方法获取 JSON if version == 1: json_data = get_ticket_response(oss_url) elif version == 2: json_data = get_ticket_response_v2(oss_url, needBarcode) else: json_data = get_ticket_response(oss_url) # 获取图片分辨率和大小 img = Image.open(BytesIO(img_bytes)) resolution = f"{img.width}x{img.height}" size_kb = round(len(img_bytes) / 1024, 2) # 插入数据库 pg.insert_ticket_image( created_by=user_id, file_name=file_name, resolution=resolution, size=size_kb, name=project_name if project_name else object_name[:8], dead_pupa_count=json_data.get("dead_pupa_count") if version == 2 else 0, moisture_content=json_data.get("moisture_content") if version == 2 else 0, cocoon_weight=json_data.get("cocoon_weight"), defective_pupa_count=json_data.get("defective_pupa_count"), fresh_shell_weight=json_data.get("fresh_shell_weight"), sample_count=json_data.get("sample_count"), barcode=json_data.get("barcode"), oss=object_name, net_weight_total=json_data.get("net_weight_total"), evaluator=json_data.get("evaluator"), reviewer=json_data.get("reviewer"), ) return json_data def process_license_image( img_bytes=None, file_name: str = None, project_name: str = None, user_id: UUID = None, ): # 上传到 OSS,使用 UUID 做对象名 if img_bytes is None: img_bytes = [] object_name = str(uuid.uuid4()) file_bytes = BytesIO(img_bytes) bucket_name = "image-license" if not minio_client.bucket_exists(bucket_name): minio_client.make_bucket(bucket_name) minIO.push_file(bucket_name, object_name, file_bytes, img_bytes, "image/jpeg") oss_url = minIO.get_temp_url(bucket_name, object_name) # 调用分析方法获取 JSON json_data = get_license_response(oss_url) # 获取图片分辨率和大小 img = Image.open(BytesIO(img_bytes)) resolution = f"{img.width}x{img.height}" size_kb = round(len(img_bytes) / 1024, 2) # 插入数据库 pg.insert_license_image( created_by=user_id, file_name=file_name, resolution=resolution, size=size_kb, name=project_name if project_name else object_name[:8], oss=object_name, type=json_data.get("type"), content=json_data.get("content"), ) return json_data def process_silkworm_cocoon_image( img_bytes=None, file_name: str = None, project_name: str = None, user_id: UUID = None, ): # 上传到 OSS,使用 UUID 做对象名 if img_bytes is None: img_bytes = [] pre_object_name = str(uuid.uuid4()) after_object_name = str(uuid.uuid4()) file_bytes = BytesIO(img_bytes) bucket_name = "image-sca" if not minio_client.bucket_exists(bucket_name): minio_client.make_bucket(bucket_name) minIO.push_file( bucket_name, "raw/" + pre_object_name, file_bytes, img_bytes, "image/jpeg" ) # YOLO检测 img_bytes_out, results_json = YOLOSingleton.detect(img_bytes) speed_json = results_json.get("speed_ms") file_bytes_out = BytesIO(img_bytes_out) minIO.push_file( bucket_name, "ai/" + after_object_name, file_bytes_out, img_bytes_out, "image/jpeg", ) # 获取图片分辨率和大小 img = Image.open(BytesIO(img_bytes)) resolution = f"{img.width}x{img.height}" size_kb = round(len(img_bytes) / 1024, 2) # 插入数据库 pg.insert_sca_image( file_name=file_name, resolution=resolution, size=size_kb, cocoon_count=results_json.get("total_objects"), max_confidence=results_json.get("max_confidence"), min_confidence=results_json.get("min_confidence"), average_confidence=results_json.get("avg_confidence"), other_info=results_json.get("class_counts"), preprocess_time_ms=speed_json.get("preprocess"), inference_time_ms=speed_json.get("inference"), postprocess_time_ms=speed_json.get("postprocess"), name=project_name if project_name else pre_object_name[:8], image_pre=pre_object_name, image_after=after_object_name, created_by=user_id, ) return { "resolution": resolution, "size": size_kb / 1024, "cocoon_count": results_json.get("total_objects"), "max_confidence": results_json.get("max_confidence"), "min_confidence": results_json.get("min_confidence"), "average_confidence": results_json.get("avg_confidence"), "preprocess_time_ms": speed_json.get("preprocess"), "inference_time_ms": speed_json.get("inference"), "postprocess_time_ms": speed_json.get("postprocess"), "details": results_json.get("class_counts"), "imageUrl": get_temp_url(bucket_name, "ai/" + after_object_name), } # 处理车牌照片 async def process_all_vehicle_animal_image( data: SentinelRecordRequest, ): oss_url = minIO.get_temp_url( "sentinel", "vehicle_image_side/" + data.vehicleImageSide ) # LLM得到车身信息 analysis_result = await get_vehicle_response(oss_url) livestock_type = analysis_result.get("livestock_type", "") remark = analysis_result.get("remark", "") have_animal = analysis_result.get("have_animal", False) if not have_animal: minIO.delete_file("sentinel", "vehicle_image_side/" + data.vehicleImageSide) minIO.delete_file("sentinel", "vehicle_image_front/" + data.vehicleImageFront) else: # 通过设备id获得组织id dept_id = get_dept_id_by_iot_user_name(data.DeviceId) # 处理汽车正面照-------------------------- # 从OSS下载 oss_url = minIO.get_temp_url( "sentinel", "vehicle_image_front/" + data.vehicleImageFront ) # 获取系统临时目录(自动兼容 Windows / Linux) tmp_dir = Path(tempfile.gettempdir()) tmp_dir.mkdir(parents=True, exist_ok=True) tmp_path = tmp_dir / data.vehicleImageFront # 下载图片到 tmp_path response = requests.get(oss_url, stream=True) if response.status_code == 200: with open(tmp_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) else: raise Exception(f"下载失败: {oss_url}, status_code={response.status_code}") # 调用识别 results = recognizer.analyze_image(str(tmp_path)) # 车牌号一次识别 license_plate = results[0].plate_no if results else "" # 车牌号二次校准 license_plate = validate_plate(license_plate) license_plate_color_str = results[0].plate_color if results else "" color_map = { "蓝色": 0, "黄色": 1, "绿色": 2, "黑色": 3, "白色": 4, } license_plate_color = color_map.get(license_plate_color_str, 0) license_plate_image = data.vehicleImageFront # 保存到数据库 saveSentinelRecord( data.Id, data.VehicleType, data.vehicleImageSide, livestock_type, remark, dept_id, license_plate, license_plate_image, license_plate_color, ) # 识别完成后删除临时文件 os.remove(tmp_path) # 可以通知的部门ids available_departments = get_dept_ids_by_dept_id(dept_id) # 通知控制界面 await ws_manager.noticeSentinel( { "content": f"车辆即将进入关卡,请准备检查", "license_plate": license_plate, "type": "vehicle_alert", }, available_departments, ) # 通知大屏界面 await ws_manager.noticeSentinelMonitorStatus( { "content": get_sentinel_record_by_id(data.Id), "type": "vehicle_alert", }, available_departments, )