import base64 from fastapi import APIRouter from config.app import F8_SERVER_USER_ID from models.BaseResponse import BaseResponse from models.F8ImageRequest import F8ImageRequest from models.F8ImageRequestV2 import F8ImageRequestV2 from service.vision import ( process_ticket_image, process_license_image, process_silkworm_cocoon_image, ) from utils import MyUtils publicRouter = APIRouter() @publicRouter.post("/recognize-cocoon-metrics") async def cocoonTicket(data: F8ImageRequestV2): input_data = data.image if "," in input_data: input_data = input_data.split(",")[1] try: img_bytes = base64.b64decode(input_data) json_data = await MyUtils.async_task( process_ticket_image, 2, data.needBarcode, img_bytes, f"{data.title}.jpg", data.title, F8_SERVER_USER_ID, ) return BaseResponse(data=json_data) except Exception as e: return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None) import json @publicRouter.post("/recognize-license") async def cocoon_license(data: F8ImageRequest): input_data = data.image if "," in input_data: input_data = input_data.split(",")[1] try: img_bytes = base64.b64decode(input_data) json_data = await MyUtils.async_task( process_license_image, img_bytes, f"{data.title}.jpg", data.title, F8_SERVER_USER_ID, ) data = json.loads(json_data.get("content")) data["type"] = json_data.get("type") return BaseResponse(data=data) except Exception as e: return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None) @publicRouter.post("/recognize-silkworm-cocoon") async def recognize_silkworm_cocoon(data: F8ImageRequest): input_data = data.image if "," in input_data: input_data = input_data.split(",")[1] try: img_bytes = base64.b64decode(input_data) json_data = await MyUtils.async_task( process_silkworm_cocoon_image, img_bytes, f"{data.title}.jpg", data.title, F8_SERVER_USER_ID, ) return BaseResponse(data=json_data) except Exception as e: return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None)