# consumer.py import asyncio import json import aio_pika from config.rabbitMQ import * from models.AnalysisRequest import AnalysisRequest from models.SentinelRecordRequest import SentinelRecordRequest from service.vision import process_vehicle_animal_image async def mq_new_analysis_test(req: dict): """将分析请求发送到 RabbitMQ 队列(异步版)""" connection = await aio_pika.connect_robust( f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}" ) async with connection: channel = await connection.channel() # 声明队列,确保队列存在 queue = await channel.declare_queue(QUEUE_NAME, durable=True) message_body = json.dumps(req) message = aio_pika.Message( body=message_body.encode(), delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 持久化 ) await channel.default_exchange.publish(message, routing_key=QUEUE_NAME) async def mq_pull_analysis_async_test(): """ 从队列拉取分析任务并处理 process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑 """ connection = await aio_pika.connect_robust( f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}" ) async with connection: queue_name = QUEUE_NAME channel = await connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(queue_name, durable=True) async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(): data = json.loads(message.body) req = AnalysisRequest(**data) print(f"收到任务: {req}") await asyncio.sleep(5) # 模拟处理 print(f"完成任务: {req}") async def sentinel_new_analysis(req: SentinelRecordRequest): """将分析请求发送到 RabbitMQ 队列(异步版)""" connection = await aio_pika.connect_robust( f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{SENTINEL_VHOST}" ) async with connection: channel = await connection.channel() # 声明队列,确保队列存在 queue = await channel.declare_queue(QUEUE_NAME, durable=True) message_body = json.dumps(req.model_dump()) message = aio_pika.Message( body=message_body.encode(), delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 持久化 ) await channel.default_exchange.publish(message, routing_key=QUEUE_NAME) async def sentinel_pull_analysis_async(): """ 从队列拉取分析任务并处理 process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑 """ connection = await aio_pika.connect_robust( f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{SENTINEL_VHOST}" ) async with connection: queue_name = QUEUE_NAME channel = await connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(queue_name, durable=True) async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(): data = json.loads(message.body) req = SentinelRecordRequest(**data) print(f"收到任务: {req}") await process_vehicle_animal_image(req) # 处理 print(f"完成任务: {req}")