Files
AILab/bbit_ai/app/service/vision.py
T
2026-02-04 13:58:18 +08:00

224 lines
7.3 KiB
Python

import uuid
from uuid import UUID
import config.minIO as minIO
import db.postgres as pg
from agent.licenseImageAgent import get_license_response
from agent.vehicleImageAgent import get_vehicle_response
from config.minIO import minio_client, get_temp_url
from config.yolo import YOLOSingleton
from db.postgres import (
get_dept_id_by_iot_user_name,
get_dept_ids_by_dept_id,
)
from db.postgres.sentinel import update_sentinel_record, get_sentinel_record_by_id
from llm.ticketLLM import *
from llm.ticketLLMv2 import get_ticket_response_v2
from models.SentinelRecordRequest import SentinelRecordRequest
from routers.WS import ws_manager
def process_ticket_image(
version: int,
needBarcode: bool = False,
img_bytes=None,
file_name: str = None,
project_name: str = None,
user_id: UUID = None,
):
"""
处理票据图片的核心逻辑,供不同接口调用
"""
# 上传到 OSS,使用 UUID 做对象名
if img_bytes is None:
img_bytes = []
object_name = str(uuid.uuid4())
file_bytes = BytesIO(img_bytes)
bucket_name = "image-ticket"
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
minIO.push_file(bucket_name, object_name, file_bytes, img_bytes, "image/jpeg")
oss_url = minIO.get_temp_url(bucket_name, object_name)
# 调用分析方法获取 JSON
if version == 1:
json_data = get_ticket_response(oss_url)
elif version == 2:
json_data = get_ticket_response_v2(oss_url, needBarcode)
else:
json_data = get_ticket_response(oss_url)
# 获取图片分辨率和大小
img = Image.open(BytesIO(img_bytes))
resolution = f"{img.width}x{img.height}"
size_kb = round(len(img_bytes) / 1024, 2)
# 插入数据库
pg.insert_ticket_image(
created_by=user_id,
file_name=file_name,
resolution=resolution,
size=size_kb,
name=project_name if project_name else object_name[:8],
dead_pupa_count=json_data.get("dead_pupa_count") if version == 2 else 0,
moisture_content=json_data.get("moisture_content") if version == 2 else 0,
cocoon_weight=json_data.get("cocoon_weight"),
defective_pupa_count=json_data.get("defective_pupa_count"),
fresh_shell_weight=json_data.get("fresh_shell_weight"),
sample_count=json_data.get("sample_count"),
barcode=json_data.get("barcode"),
oss=object_name,
net_weight_total=json_data.get("net_weight_total"),
evaluator=json_data.get("evaluator"),
reviewer=json_data.get("reviewer"),
)
return json_data
def process_license_image(
img_bytes=None,
file_name: str = None,
project_name: str = None,
user_id: UUID = None,
):
# 上传到 OSS,使用 UUID 做对象名
if img_bytes is None:
img_bytes = []
object_name = str(uuid.uuid4())
file_bytes = BytesIO(img_bytes)
bucket_name = "image-license"
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
minIO.push_file(bucket_name, object_name, file_bytes, img_bytes, "image/jpeg")
oss_url = minIO.get_temp_url(bucket_name, object_name)
# 调用分析方法获取 JSON
json_data = get_license_response(oss_url)
# 获取图片分辨率和大小
img = Image.open(BytesIO(img_bytes))
resolution = f"{img.width}x{img.height}"
size_kb = round(len(img_bytes) / 1024, 2)
# 插入数据库
pg.insert_license_image(
created_by=user_id,
file_name=file_name,
resolution=resolution,
size=size_kb,
name=project_name if project_name else object_name[:8],
oss=object_name,
type=json_data.get("type"),
content=json_data.get("content"),
)
return json_data
def process_silkworm_cocoon_image(
img_bytes=None,
file_name: str = None,
project_name: str = None,
user_id: UUID = None,
):
# 上传到 OSS,使用 UUID 做对象名
if img_bytes is None:
img_bytes = []
pre_object_name = str(uuid.uuid4())
after_object_name = str(uuid.uuid4())
file_bytes = BytesIO(img_bytes)
bucket_name = "image-sca"
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
minIO.push_file(
bucket_name, "raw/" + pre_object_name, file_bytes, img_bytes, "image/jpeg"
)
# YOLO检测
img_bytes_out, results_json = YOLOSingleton.detect(img_bytes)
speed_json = results_json.get("speed_ms")
file_bytes_out = BytesIO(img_bytes_out)
minIO.push_file(
bucket_name,
"ai/" + after_object_name,
file_bytes_out,
img_bytes_out,
"image/jpeg",
)
# 获取图片分辨率和大小
img = Image.open(BytesIO(img_bytes))
resolution = f"{img.width}x{img.height}"
size_kb = round(len(img_bytes) / 1024, 2)
# 插入数据库
pg.insert_sca_image(
file_name=file_name,
resolution=resolution,
size=size_kb,
cocoon_count=results_json.get("total_objects"),
max_confidence=results_json.get("max_confidence"),
min_confidence=results_json.get("min_confidence"),
average_confidence=results_json.get("avg_confidence"),
other_info=results_json.get("class_counts"),
preprocess_time_ms=speed_json.get("preprocess"),
inference_time_ms=speed_json.get("inference"),
postprocess_time_ms=speed_json.get("postprocess"),
name=project_name if project_name else pre_object_name[:8],
image_pre=pre_object_name,
image_after=after_object_name,
created_by=user_id,
)
return {
"resolution": resolution,
"size": size_kb / 1024,
"cocoon_count": results_json.get("total_objects"),
"max_confidence": results_json.get("max_confidence"),
"min_confidence": results_json.get("min_confidence"),
"average_confidence": results_json.get("avg_confidence"),
"preprocess_time_ms": speed_json.get("preprocess"),
"inference_time_ms": speed_json.get("inference"),
"postprocess_time_ms": speed_json.get("postprocess"),
"details": results_json.get("class_counts"),
"imageUrl": get_temp_url(bucket_name, "ai/" + after_object_name),
}
async def process_vehicle_animal_image(
data: SentinelRecordRequest,
):
# 通过设备id获得组织id
dept_id = get_dept_id_by_iot_user_name(data.DeviceId)
# 得到动物类型
oss_url = minIO.get_temp_url("sentinel", "vehicle_image/" + data.VehicleImage)
analysis_result = await get_vehicle_response(oss_url)
livestock_type = analysis_result.get("livestock_type", "")
remark = analysis_result.get("remark", "")
# 保存到数据库
update_sentinel_record(data.Id, livestock_type, remark, dept_id)
# 可以通知的部门ids
available_departments = get_dept_ids_by_dept_id(dept_id)
# 通知控制界面
await ws_manager.noticeSentinel(
{
"content": f"载有{livestock_type}的车辆即将进入关卡,请准备检查",
"type": "vehicle_alert",
},
available_departments,
)
# 通知大屏界面
await ws_manager.noticeSentinelMonitorStatus(
{
"content": get_sentinel_record_by_id(data.Id),
"type": "vehicle_alert",
},
available_departments,
)