53 lines
1.8 KiB
Python
53 lines
1.8 KiB
Python
# consumer.py
|
|
import asyncio
|
|
import json
|
|
|
|
import aio_pika
|
|
|
|
from config.rabbitMQ import *
|
|
from models.AnalysisRequest import AnalysisRequest
|
|
|
|
|
|
async def mq_new_analysis(req: AnalysisRequest):
|
|
"""将分析请求发送到 RabbitMQ 队列(异步版)"""
|
|
connection = await aio_pika.connect_robust(
|
|
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}"
|
|
)
|
|
|
|
async with connection:
|
|
channel = await connection.channel()
|
|
# 声明队列,确保队列存在
|
|
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
|
|
|
|
message_body = json.dumps(req.dict())
|
|
message = aio_pika.Message(
|
|
body=message_body.encode(),
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 持久化
|
|
)
|
|
|
|
await channel.default_exchange.publish(message, routing_key=QUEUE_NAME)
|
|
|
|
|
|
async def mq_pull_analysis_async():
|
|
"""
|
|
从队列拉取分析任务并处理
|
|
process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑
|
|
"""
|
|
connection = await aio_pika.connect_robust(
|
|
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}"
|
|
)
|
|
async with connection:
|
|
queue_name = QUEUE_NAME
|
|
channel = await connection.channel()
|
|
await channel.set_qos(prefetch_count=1)
|
|
queue = await channel.declare_queue(queue_name, durable=True)
|
|
|
|
async with queue.iterator() as queue_iter:
|
|
async for message in queue_iter:
|
|
async with message.process():
|
|
data = json.loads(message.body)
|
|
req = AnalysisRequest(**data)
|
|
print(f"收到任务: {req}")
|
|
await asyncio.sleep(5) # 模拟处理
|
|
print(f"完成任务: {req}")
|