import uuid from uuid import UUID import config.minIO as minIO import db.postgres as pg from agent.licenseImageAgent import get_license_response from agent.vehicleImageAgent import get_vehicle_response from config.minIO import minio_client from config.yolo import YOLOSingleton from db.postgres import get_dept_id_by_iot_user_name, get_dept_ids_by_dept_id from db.postgres.sentinel import update_sentinel_record from llm.ticketLLM import * from llm.ticketLLMv2 import get_ticket_response_v2 from models.SentinelRecordRequest import SentinelRecordRequest from routers.WS import ws_manager def process_ticket_image( version: int, needBarcode: bool = False, img_bytes=None, file_name: str = None, project_name: str = None, user_id: UUID = None, ): """ 处理票据图片的核心逻辑,供不同接口调用 """ # 上传到 OSS,使用 UUID 做对象名 if img_bytes is None: img_bytes = [] object_name = str(uuid.uuid4()) file_bytes = BytesIO(img_bytes) bucket_name = "image-ticket" if not minio_client.bucket_exists(bucket_name): minio_client.make_bucket(bucket_name) minIO.push_file(bucket_name, object_name, file_bytes, img_bytes, "image/jpeg") oss_url = minIO.get_temp_url(bucket_name, object_name) # 调用分析方法获取 JSON if version == 1: json_data = get_ticket_response(oss_url) elif version == 2: json_data = get_ticket_response_v2(oss_url, needBarcode) else: json_data = get_ticket_response(oss_url) # 获取图片分辨率和大小 img = Image.open(BytesIO(img_bytes)) resolution = f"{img.width}x{img.height}" size_kb = round(len(img_bytes) / 1024, 2) # 插入数据库 pg.insert_ticket_image( created_by=user_id, file_name=file_name, resolution=resolution, size=size_kb, name=project_name if project_name else object_name[:8], dead_pupa_count=json_data.get("dead_pupa_count") if version == 2 else 0, moisture_content=json_data.get("moisture_content") if version == 2 else 0, cocoon_weight=json_data.get("cocoon_weight"), defective_pupa_count=json_data.get("defective_pupa_count"), fresh_shell_weight=json_data.get("fresh_shell_weight"), sample_count=json_data.get("sample_count"), barcode=json_data.get("barcode"), oss=object_name, net_weight_total=json_data.get("net_weight_total"), evaluator=json_data.get("evaluator"), reviewer=json_data.get("reviewer"), ) return json_data def process_license_image( img_bytes=None, file_name: str = None, project_name: str = None, user_id: UUID = None, ): # 上传到 OSS,使用 UUID 做对象名 if img_bytes is None: img_bytes = [] object_name = str(uuid.uuid4()) file_bytes = BytesIO(img_bytes) bucket_name = "image-license" if not minio_client.bucket_exists(bucket_name): minio_client.make_bucket(bucket_name) minIO.push_file(bucket_name, object_name, file_bytes, img_bytes, "image/jpeg") oss_url = minIO.get_temp_url(bucket_name, object_name) # 调用分析方法获取 JSON json_data = get_license_response(oss_url) # 获取图片分辨率和大小 img = Image.open(BytesIO(img_bytes)) resolution = f"{img.width}x{img.height}" size_kb = round(len(img_bytes) / 1024, 2) # 插入数据库 pg.insert_license_image( created_by=user_id, file_name=file_name, resolution=resolution, size=size_kb, name=project_name if project_name else object_name[:8], oss=object_name, type=json_data.get("type"), content=json_data.get("content"), ) return json_data def process_silkworm_cocoon_image( img_bytes=None, file_name: str = None, project_name: str = None, user_id: UUID = None, ): # 上传到 OSS,使用 UUID 做对象名 if img_bytes is None: img_bytes = [] pre_object_name = str(uuid.uuid4()) after_object_name = str(uuid.uuid4()) file_bytes = BytesIO(img_bytes) bucket_name = "image-sca" if not minio_client.bucket_exists(bucket_name): minio_client.make_bucket(bucket_name) minIO.push_file( bucket_name, "raw/" + pre_object_name, file_bytes, img_bytes, "image/jpeg" ) # YOLO检测 img_bytes_out, results_json = YOLOSingleton.detect(img_bytes) speed_json = results_json.get("speed_ms") file_bytes_out = BytesIO(img_bytes_out) minIO.push_file( bucket_name, "ai/" + after_object_name, file_bytes_out, img_bytes_out, "image/jpeg", ) # 获取图片分辨率和大小 img = Image.open(BytesIO(img_bytes)) resolution = f"{img.width}x{img.height}" size_kb = round(len(img_bytes) / 1024, 2) # 插入数据库 pg.insert_sca_image( file_name=file_name, resolution=resolution, size=size_kb, cocoon_count=results_json.get("total_objects"), max_confidence=results_json.get("max_confidence"), min_confidence=results_json.get("min_confidence"), average_confidence=results_json.get("avg_confidence"), other_info=results_json.get("class_counts"), preprocess_time_ms=speed_json.get("preprocess"), inference_time_ms=speed_json.get("inference"), postprocess_time_ms=speed_json.get("postprocess"), name=project_name if project_name else pre_object_name[:8], image_pre=pre_object_name, image_after=after_object_name, created_by=user_id, ) return { "resolution": resolution, "size": size_kb, "cocoon_count": results_json.get("total_objects"), "max_confidence": results_json.get("max_confidence"), "min_confidence": results_json.get("min_confidence"), "average_confidence": results_json.get("avg_confidence"), "preprocess_time_ms": speed_json.get("preprocess"), "inference_time_ms": speed_json.get("inference"), "postprocess_time_ms": speed_json.get("postprocess"), "details": results_json.get("class_counts"), } async def process_vehicle_animal_image( data: SentinelRecordRequest, ): # 通过设备id获得组织id dept_id = get_dept_id_by_iot_user_name(data.DeviceId) # 得到动物类型 oss_url = minIO.get_temp_url("sentinel", "vehicle_image/" + data.VehicleImage) analysis_result = await get_vehicle_response(oss_url) livestock_type = analysis_result.get("livestock_type", "") remark = analysis_result.get("remark", "") available_departments = get_dept_ids_by_dept_id(dept_id) await ws_manager.noticeSentinel( { "content": f"载有{livestock_type}的车辆即将进入关卡,请准备检查", "type": "vehicle_alert", }, available_departments, ) # 保存到数据库 return update_sentinel_record(data.Id, livestock_type, remark, dept_id)