完善牧安云哨-后端

This commit is contained in:
BBIT-Kai
2025-12-29 16:30:36 +08:00
parent cd7aa35960
commit b9b8d30ebf
23 changed files with 1074 additions and 41 deletions
+98
View File
@@ -0,0 +1,98 @@
# consumer.py
import asyncio
import json
import aio_pika
from config.rabbitMQ import *
from models.AnalysisRequest import AnalysisRequest
from models.SentinelRecordRequest import SentinelRecordRequest
from service.vision import process_vehicle_animal_image
async def mq_new_analysis_test(req: dict):
"""将分析请求发送到 RabbitMQ 队列(异步版)"""
connection = await aio_pika.connect_robust(
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}"
)
async with connection:
channel = await connection.channel()
# 声明队列,确保队列存在
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
message_body = json.dumps(req)
message = aio_pika.Message(
body=message_body.encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 持久化
)
await channel.default_exchange.publish(message, routing_key=QUEUE_NAME)
async def mq_pull_analysis_async_test():
"""
从队列拉取分析任务并处理
process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑
"""
connection = await aio_pika.connect_robust(
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}"
)
async with connection:
queue_name = QUEUE_NAME
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(queue_name, durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
data = json.loads(message.body)
req = AnalysisRequest(**data)
print(f"收到任务: {req}")
await asyncio.sleep(5) # 模拟处理
print(f"完成任务: {req}")
async def sentinel_new_analysis(req: SentinelRecordRequest):
"""将分析请求发送到 RabbitMQ 队列(异步版)"""
connection = await aio_pika.connect_robust(
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{SENTINEL_VHOST}"
)
async with connection:
channel = await connection.channel()
# 声明队列,确保队列存在
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
message_body = json.dumps(req.model_dump())
message = aio_pika.Message(
body=message_body.encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 持久化
)
await channel.default_exchange.publish(message, routing_key=QUEUE_NAME)
async def sentinel_pull_analysis_async():
"""
从队列拉取分析任务并处理
process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑
"""
connection = await aio_pika.connect_robust(
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{SENTINEL_VHOST}"
)
async with connection:
queue_name = QUEUE_NAME
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(queue_name, durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
data = json.loads(message.body)
req = SentinelRecordRequest(**data)
print(f"收到任务: {req}")
await process_vehicle_animal_image(req) # 处理
print(f"完成任务: {req}")