新增证件识别接口

This commit is contained in:
BBIT-Kai
2025-10-29 13:53:55 +08:00
parent a1f0d0ad55
commit aff1b85ab0
14 changed files with 720 additions and 74 deletions
+152
View File
@@ -0,0 +1,152 @@
from typing import TypedDict
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, END
from config.llm import llmVision
# -------- 定义状态 --------
class State(TypedDict):
image_url: str # 证件照
type: int # 证件类别
content: str # 最终内容
def analyze_card(state: State, prompt_text: str):
messages = [
HumanMessage(
content=[
{"type": "text", "text": prompt_text},
{"type": "image_url", "image_url": {"url": state["image_url"]}},
]
)
]
return llmVision.invoke(messages).content
# -------- 定义节点 --------
def decide_source(state: State, max_retry=3):
choice = analyze_card(
state,
"""
请根据图片内容判断其中的证件类型,并输出对应代号。
代号说明:
-1: 并非证件
0:身份证
1:银行卡
只输出代号数字,不要输出任何文字、标点或解释。
你的输出是:
""",
)
print("图片类型是", choice)
try:
choice = int(choice)
state["type"] = choice
if choice == -1:
state["content"] = '{"result": "暂不支持此证件"}'
except ValueError:
state["type"] = -1
state["content"] = '{"result": "暂不支持此证件"}'
return state
def idcard(state: State):
state["content"] = analyze_card(
state,
"""
你是一个 OCR 信息提取专家,请从图片中识别出身份证的全部可见信息,并输出严格的 JSON 格式。
要求:
1. 无论是正面还是反面,都要识别所有字段。
2. 如果某个字段无法辨认,请将值设为 null。
3. 只输出 JSON,不要输出任何解释或多余内容。
4. 严格保持 JSON 格式,键名统一为英文。
5. "side" 字段为身份证方向,填写:人像面 或 国徽面。
JSON 示例格式:
{
"side": "国徽面",
"name": "持证人姓名",
"gender": "性别",
"ethnicity": "民族",
"id_number": "身份证号",
"birth_date": "出生日期",
"address": "住址",
"issuing_authority": "签发机关",
"valid_period_start": "有效期开始日期",
"valid_period_end": "有效期结束日期",
"notes": "其他可见文字或备注"
}
请确保输出的 JSON 可以被严格解析。
""",
)
return state
def bankcard(state: State):
state["content"] = analyze_card(
state,
"""
你是一个 OCR 信息提取专家,请从图片中识别出银行卡上的全部可见信息,并输出严格的 JSON 格式。
要求:
1. 识别卡号、发卡行、持卡人、有效期等关键信息。
2. 如果某个字段无法辨认,请将值设为 null。
3. 只输出 JSON,不要输出任何解释或附加内容。
4. 严格保持 JSON 格式,键名统一为英文。
JSON 示例格式:
{
"bank_name": "发卡行名称",
"card_number": "卡号",
"card_holder": "持卡人姓名",
"expiry_date": "有效期",
"card_type": "卡种类型(如借记卡/信用卡/Visa/MasterCard",
"issuer_country": "发卡国家",
"notes": "其他可见文字或符号"
}
请确保输出的 JSON 可以被严格解析。
""",
)
return state
# ------------------------------------------------------------------------ 构建有向图 --------
workflow = StateGraph(State)
workflow.add_node("decide", decide_source)
workflow.add_node("idcard", idcard)
workflow.add_node("bankcard", bankcard)
workflow.set_entry_point("decide")
# 条件边:根据 path 决定走向
workflow.add_conditional_edges(
"decide",
lambda state: state["type"],
{
-1: END,
0: "idcard",
1: "bankcard",
},
)
workflow.add_edge("idcard", END)
workflow.add_edge("bankcard", END)
graph = workflow.compile()
import re
# 执行函数
def get_license_response(image_url: str):
final_state = graph.invoke(
{
"image_url": image_url,
}
)
# 去掉 ```json 和 ``` 包裹
final_state["content"] = final_state["content"]
final_state["content"] = re.sub(
r"^```json\s*|\s*```$", "", final_state["content"].strip()
)
print("最终识别内容:", final_state["content"])
return final_state
-60
View File
@@ -1,60 +0,0 @@
from typing import Optional
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage
from config.llm import *
from langchain.prompts import PromptTemplate
from langchain.prompts import PromptTemplate
from langchain.schema import HumanMessage
import os
import base64
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser
from langchain.schema import HumanMessage
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field
import json
import re
import json
import requests
import cv2
import numpy as np
import requests
class CocoonSample(BaseModel):
moisture_content: float = Field(
...,
description="茧的含水量,单位为百分比(%),浮点数"
)
cocoon_weight: float = Field(
...,
description="下足茧的重量,单位为克,可带小数"
)
defective_pupa_count: int = Field(
...,
description="非好蛹粒数,即不合格蛹的数量,整数"
)
fresh_shell_weight: float = Field(
...,
description="鲜壳重量,单位为克,可带小数"
)
sample_count: int = Field(
...,
description="小样粒数,用于检测的茧粒数,整数"
)
net_weight_total: float = Field(
...,
description="所有样品的净重合计,单位为克,浮点数"
)
evaluator: Optional[str] = Field(
None,
description="仪评人姓名,可能为空"
)
reviewer: Optional[str] = Field(
None,
description="复核人员姓名,可能为空"
)
+77 -1
View File
@@ -1,4 +1,3 @@
import json
from typing import List, Dict
from langchain_postgres import PostgresChatMessageHistory
@@ -559,3 +558,80 @@ def insert_ticket_image(
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
import json
# ————————————————————————————————————————————————————证件照片识别———————————————————————————————
def insert_license_image(
created_by, file_name, resolution, size, name, oss, type, content
):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO license_images (
created_by, created_at, file_name, resolution, size, name, oss,
type, content
)
VALUES (%s, NOW(), %s, %s, %s, %s, %s, %s, %s )
RETURNING id
""",
(created_by, file_name, resolution, size, name, oss, type, content),
)
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
def get_license_image_list(user_id, page=1, page_size=10):
"""
获取用户已分析图片列表,带分页
"""
offset = (page - 1) * page_size
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# 1️⃣ 查询总条数
cursor.execute(
"""
SELECT COUNT(*)
FROM license_images
WHERE created_by = %s
""",
(user_id,),
)
total = cursor.fetchone()[0]
# 2️⃣ 查询当前页数据
cursor.execute(
"""
SELECT created_at, file_name, resolution, size, name, oss, id, type, content
FROM license_images
WHERE created_by = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(user_id, page_size, offset),
)
rows = cursor.fetchall()
result = []
for row in rows:
result.append(
{
"created_at": MyUtils.format_datetime(row[0]),
"file_name": row[1],
"resolution": row[2],
"size": round(row[3] / 1024, 2),
"name": row[4],
"oss_url": get_temp_url("image-license", row[5]),
"id": row[6],
"type": row[7],
"content": row[8],
}
)
return total, result
+26 -1
View File
@@ -4,8 +4,9 @@ from fastapi import APIRouter
from config.app import F8_SERVER_USER_ID
from models.BaseResponse import BaseResponse
from models.F8ImageRequest import F8ImageRequest
from models.F8ImageRequestV2 import F8ImageRequestV2
from service.vision import process_ticket_image
from service.vision import process_ticket_image, process_license_image
from utils import MyUtils
publicRouter = APIRouter()
@@ -30,3 +31,27 @@ async def cocoonTicket(data: F8ImageRequestV2):
return BaseResponse(data=json_data)
except Exception as e:
return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None)
import json
@publicRouter.post("/recognize-license")
async def cocoon_license(data: F8ImageRequest):
input_data = data.image
if "," in input_data:
input_data = input_data.split(",")[1]
try:
img_bytes = base64.b64decode(input_data)
json_data = await MyUtils.async_task(
process_license_image,
img_bytes,
f"{data.title}.jpg",
data.title,
F8_SERVER_USER_ID,
)
data = json.loads(json_data.get("content"))
data["type"] = json_data.get("type")
return BaseResponse(data=data)
except Exception as e:
return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None)
+56 -3
View File
@@ -1,13 +1,13 @@
from uuid import UUID
from fastapi import APIRouter, File, Form, Depends
from fastapi import APIRouter, File, Form, Depends, Query
import db.postgres as pg
from config.security import get_user_id_from_token
from llm.ticketLLM import *
from models.BaseResponse import BaseResponse
from models.ImageRequest import ImageRequest
from service.vision import process_ticket_image
from service.vision import process_ticket_image, process_license_image
from utils import MyUtils
visionRouter = APIRouter()
@@ -51,6 +51,25 @@ def cocoonTicket(user_id: UUID = Depends(get_user_id_from_token)):
return BaseResponse(data=pg.get_ticket_image_list(user_id))
@visionRouter.post("/createTicketImageTaskV2")
async def createTicketImageTask(
file: UploadFile = File(...),
projectName: str = Form(...),
user_id: UUID = Depends(get_user_id_from_token),
):
if not user_id:
return {"error": "userId is required"}
try:
contents = await file.read()
await MyUtils.async_task(
process_ticket_image, 2, True, contents, file.filename, projectName, user_id
)
return BaseResponse(data=None)
except Exception as e:
print(str(e))
return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None)
@visionRouter.post("/createTicketImageTaskV2")
async def createTicketImageTask(
file: UploadFile = File(...),
@@ -66,5 +85,39 @@ async def createTicketImageTask(
)
return BaseResponse(data=json_data)
except Exception as e:
print(str(e))
return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None)
@visionRouter.post("/createLicenseImageTask")
async def createLicenseImageTask(
file: UploadFile = File(...),
projectName: str = Form(...),
user_id: UUID = Depends(get_user_id_from_token),
):
if not user_id:
return {"error": "userId is required"}
try:
contents = await file.read()
await MyUtils.async_task(
process_license_image, contents, file.filename, projectName, user_id
)
return BaseResponse(data=None)
except Exception as e:
return BaseResponse(status=False, message=f"解析失败: {str(e)}", data=None)
@visionRouter.get("/getLicenseImageList")
def cocoonLicense(
user_id: UUID = Depends(get_user_id_from_token),
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100),
):
if not user_id:
return {"error": "userId is required"}
total, items = pg.get_license_image_list(user_id, page=page, page_size=page_size)
return BaseResponse(
data={
"total": total,
"items": items,
}
)
+41
View File
@@ -3,6 +3,7 @@ from uuid import UUID
import config.minIO as minIO
import db.postgres as pg
from agent.licenseImageAgent import get_license_response
from config.minIO import minio_client
from llm.ticketLLM import *
from llm.ticketLLMv2 import get_ticket_response_v2
@@ -65,3 +66,43 @@ def process_ticket_image(
)
return json_data
def process_license_image(
img_bytes=None,
file_name: str = None,
project_name: str = None,
user_id: UUID = None,
):
# 上传到 OSS,使用 UUID 做对象名
if img_bytes is None:
img_bytes = []
object_name = str(uuid.uuid4())
file_bytes = BytesIO(img_bytes)
bucket_name = "image-license"
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
minIO.push_file(bucket_name, object_name, file_bytes, img_bytes, "image/jpeg")
oss_url = minIO.get_temp_url(bucket_name, object_name)
# 调用分析方法获取 JSON
json_data = get_license_response(oss_url)
# 获取图片分辨率和大小
img = Image.open(BytesIO(img_bytes))
resolution = f"{img.width}x{img.height}"
size_kb = round(len(img_bytes) / 1024, 2)
# 插入数据库
pg.insert_license_image(
created_by=user_id,
file_name=file_name,
resolution=resolution,
size=size_kb,
name=project_name if project_name else object_name[:8],
oss=object_name,
type=json_data.get("type"),
content=json_data.get("content"),
)
return json_data