import uuid from uuid import UUID from fastapi import APIRouter, File, Form, Depends, Query import db.postgres as pg from config.minIO import get_upload_token from config.security import get_user_id_from_token from llm.ticketLLM import * from models.BaseResponse import BaseResponse from models.ImageRequest import ImageRequest from service.vision import ( process_ticket_image, process_license_image, process_silkworm_cocoon_image, ) from utils import MyUtils visionRouter = APIRouter() # 测试接口,已经可以实现 @visionRouter.post("/cocoonTicket") def cocoonTicket(data: ImageRequest, user_id: UUID = Depends(get_user_id_from_token)): if not user_id: return {"error": "userId is required"} try: json = get_ticket_response(data.image) return BaseResponse(data=json) except: return BaseResponse(status=False, message="unknown error", data=None) # ————————————————————————————————仪评指标联识别任务———————————————————————————————————————————————— # 一代 @visionRouter.post("/createTicketImageTask") async def createTicketImageTask( file: UploadFile = File(...), projectName: str = Form(...), user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} try: contents = await file.read() json_data = await MyUtils.async_task( process_ticket_image, 1, True, contents, file.filename, projectName, user_id ) return BaseResponse(data=json_data) except Exception as e: print(str(e)) return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None) # 二代 @visionRouter.post("/createTicketImageTaskV2") async def createTicketImageTask( file: UploadFile = File(...), projectName: str = Form(...), user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} try: contents = await file.read() await MyUtils.async_task( process_ticket_image, 2, True, contents, file.filename, projectName, user_id ) return BaseResponse(data=None) except Exception as e: print(str(e)) return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None) # 获取仪评指标联识别任务列表 @visionRouter.get("/getTicketImageList") def cocoonTicket(user_id: UUID = Depends(get_user_id_from_token)): if not user_id: return {"error": "userId is required"} return BaseResponse(data=pg.get_ticket_image_list(user_id)) # ————————————————————————————————证件照片识别任务———————————————————————————————————————————————— @visionRouter.post("/createLicenseImageTask") async def createLicenseImageTask( file: UploadFile = File(...), projectName: str = Form(...), user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} try: contents = await file.read() await MyUtils.async_task( process_license_image, contents, file.filename, projectName, user_id ) return BaseResponse(data=None) except Exception as e: return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None) @visionRouter.get("/getLicenseImageList") def getLicenseImageList( user_id: UUID = Depends(get_user_id_from_token), page: int = Query(1, ge=1), page_size: int = Query(10, ge=1, le=100), ): if not user_id: return {"error": "userId is required"} total, items = pg.get_license_image_list(user_id, page=page, page_size=page_size) return BaseResponse( data={ "total": total, "items": items, } ) # ————————————————————————————————蚕茧识别任务———————————————————————————————————————————————— @visionRouter.post("/createSilkwormCocoonAnalysisTask") async def createSilkwormCocoonAnalysisTask( file: UploadFile = File(...), projectName: str = Form(...), user_id: UUID = Depends(get_user_id_from_token), ): if not user_id: return {"error": "userId is required"} try: contents = await file.read() await MyUtils.async_task( process_silkworm_cocoon_image, contents, file.filename, projectName, user_id ) return BaseResponse(data=None) except Exception as e: return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None) @visionRouter.get("/getSilkwormCocoonAnalysisTasks") def getSilkwormCocoonAnalysisTasks( user_id: UUID = Depends(get_user_id_from_token), name: str = "", page: int = Query(1, ge=1), page_size: int = Query(10, ge=1, le=100), ): if not user_id: return {"error": "userId is required"} total, items = pg.get_sca_image_list(user_id, name, page=page, page_size=page_size) return BaseResponse( data={ "total": total, "items": items, } ) # ————————————————————————————————蚕茧视频识别任务———————————————————————————————————————————————— @visionRouter.get("/getIVASCUploadToken") def getIVASCUploadToken( user_id: UUID = Depends(get_user_id_from_token), ): # 生成唯一文件名,避免覆盖 object_name = f"raw/{uuid.uuid4()}" return BaseResponse(data=get_upload_token(user_id, "video-sca", object_name)) @visionRouter.get("/getScVideoList") def getScVideoList( user_id: UUID = Depends(get_user_id_from_token), name: str = "", page: int = Query(1, ge=1), page_size: int = Query(10, ge=1, le=100), ): if not user_id: return {"error": "userId is required"} total, items = pg.get_sca_video_list(name, page=page, page_size=page_size) return BaseResponse( data={ "total": total, "items": items, } ) @visionRouter.get("/getAnalyticsDetailBySCVideoId") def getAnalyticsDetailBySCVideoId( user_id: UUID = Depends(get_user_id_from_token), vId: str = "", ): if not user_id: return {"error": "userId is required"} return BaseResponse(data=pg.get_sca_video_details(vId))