# consumer.py import json import time import pika from config.rabbitMQ import * from models.AnalysisRequest import AnalysisRequest def mq_new_analysis(req: AnalysisRequest): """将分析请求发送到 RabbitMQ 队列""" credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASSWORD) connection = pika.BlockingConnection( pika.ConnectionParameters( host=RABBIT_HOST, virtual_host=RABBIT_VHOST, credentials=credentials ) ) channel = connection.channel() channel.queue_declare(queue=QUEUE_NAME, durable=True) message = json.dumps(req.dict()) try: channel.basic_publish( exchange="", routing_key=QUEUE_NAME, body=message, properties=pika.BasicProperties(delivery_mode=2), # 持久化消息 ) finally: channel.close() connection.close() def mq_pull_analysis(): """ 从队列拉取分析任务并处理 process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑 """ credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASSWORD) connection = pika.BlockingConnection( pika.ConnectionParameters( host=RABBIT_HOST, virtual_host=RABBIT_VHOST, credentials=credentials ) ) channel = connection.channel() channel.queue_declare(queue=QUEUE_NAME, durable=True) channel.basic_qos(prefetch_count=1) # 保证一次只处理一条消息 def callback(ch, method, properties, body): try: data = json.loads(body) req = AnalysisRequest(**data) print(f"收到任务: {req}") time.sleep(5) print(f"完成任务: {req}") except Exception as e: print(f"处理任务出错: {e}") finally: ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback) print("开启队列消费者,等待分析任务...") try: channel.start_consuming() finally: channel.close() connection.close()