后端代码微调:SQLserver地址账号变更

This commit is contained in:
BBIT-Kai
2025-11-05 18:10:00 +08:00
parent 5a73018582
commit 9a79bde95f
4 changed files with 115 additions and 97 deletions
+33 -49
View File
@@ -1,68 +1,52 @@
# consumer.py
import asyncio
import json
import time
import pika
import aio_pika
from config.rabbitMQ import *
from models.AnalysisRequest import AnalysisRequest
def mq_new_analysis(req: AnalysisRequest):
"""将分析请求发送到 RabbitMQ 队列"""
credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASSWORD)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=RABBIT_HOST, virtual_host=RABBIT_VHOST, credentials=credentials
)
async def mq_new_analysis(req: AnalysisRequest):
"""将分析请求发送到 RabbitMQ 队列(异步版)"""
connection = await aio_pika.connect_robust(
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}"
)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
message = json.dumps(req.dict())
try:
channel.basic_publish(
exchange="",
routing_key=QUEUE_NAME,
body=message,
properties=pika.BasicProperties(delivery_mode=2), # 持久化消息
async with connection:
channel = await connection.channel()
# 声明队列,确保队列存在
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
message_body = json.dumps(req.dict())
message = aio_pika.Message(
body=message_body.encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 持久化
)
finally:
channel.close()
connection.close()
await channel.default_exchange.publish(message, routing_key=QUEUE_NAME)
def mq_pull_analysis():
async def mq_pull_analysis_async():
"""
从队列拉取分析任务并处理
process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑
"""
credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASSWORD)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=RABBIT_HOST, virtual_host=RABBIT_VHOST, credentials=credentials
)
connection = await aio_pika.connect_robust(
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}"
)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_qos(prefetch_count=1) # 保证一次只处理一条消息
async with connection:
queue_name = QUEUE_NAME
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(queue_name, durable=True)
def callback(ch, method, properties, body):
try:
data = json.loads(body)
req = AnalysisRequest(**data)
print(f"收到任务: {req}")
time.sleep(5)
print(f"完成任务: {req}")
except Exception as e:
print(f"处理任务出错: {e}")
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback)
print("开启队列消费者,等待分析任务...")
try:
channel.start_consuming()
finally:
channel.close()
connection.close()
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
data = json.loads(message.body)
req = AnalysisRequest(**data)
print(f"收到任务: {req}")
await asyncio.sleep(5) # 模拟处理
print(f"完成任务: {req}")