后端更新

This commit is contained in:
BBIT-Kai
2026-03-26 17:48:20 +08:00
parent 4c2bcd7dce
commit 0c2859b0db
22 changed files with 1336 additions and 213 deletions
+77 -71
View File
@@ -1,97 +1,103 @@
# consumer.py
import asyncio
import json
import traceback
import aio_pika
from config.rabbitMQ import *
from models.AnalysisRequest import AnalysisRequest
from models.SentinelRecordRequest import SentinelRecordRequest
from service.vision import process_vehicle_animal_image
from service.vision import (
process_all_vehicle_animal_image,
)
async def mq_new_analysis_test(req: dict):
"""将分析请求发送到 RabbitMQ 队列(异步版)"""
connection = await aio_pika.connect_robust(
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}"
)
class MQClient:
"""RabbitMQ 单例客户端,支持生产和消费"""
async with connection:
channel = await connection.channel()
# 声明队列,确保队列存在
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
_instance = None
message_body = json.dumps(req)
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
self._connection = None
self._channel = None
self._consumer_tasks = []
# ---------------- 连接初始化 ----------------
async def init(self, prefetch_count: int = 10):
"""启动时初始化连接和通道"""
if self._connection is None:
self._connection = await aio_pika.connect_robust(
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{SENTINEL_VHOST}"
)
self._channel = await self._connection.channel()
await self._channel.set_qos(prefetch_count=prefetch_count)
# ---------------- 发布消息 ----------------
async def publish(self, queue_name: str, message_body: str):
"""向指定队列发送消息"""
if self._channel is None:
raise RuntimeError("MQClient 未初始化")
# 队列幂等声明
queue = await self._channel.declare_queue(queue_name, durable=True)
message = aio_pika.Message(
body=message_body.encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 持久化
body=message_body.encode(), delivery_mode=aio_pika.DeliveryMode.PERSISTENT
)
await self._channel.default_exchange.publish(message, routing_key=queue_name)
async def send_all_analysis(self, req: SentinelRecordRequest):
await self.publish(
SENTINEL_ANALYSIS_ALL_QUEUE_NAME, json.dumps(req.model_dump())
)
await channel.default_exchange.publish(message, routing_key=QUEUE_NAME)
# ---------------- 消费消息 ----------------
async def consume_queue(self, queue_name: str, process_func):
"""
持续消费队列
process_func: async function 接收 dict 或 Request 对象
"""
if self._channel is None:
raise RuntimeError("MQClient 未初始化")
async def mq_pull_analysis_async_test():
"""
从队列拉取分析任务并处理
process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑
"""
connection = await aio_pika.connect_robust(
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}"
)
async with connection:
queue_name = QUEUE_NAME
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(queue_name, durable=True)
queue = await self._channel.declare_queue(queue_name, durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
data = json.loads(message.body)
req = AnalysisRequest(**data)
print(f"收到任务: {req}")
await asyncio.sleep(5) # 模拟处理
print(f"完成任务: {req}")
try:
body = message.body.decode()
data = json.loads(body)
await process_func(data)
except Exception as e:
print(f"[MQ Consume Error] {e}")
traceback.print_exc()
# ---------------- 启动全局分析消费者 ----------------
async def start_all_consumer(self):
async def _process(data: dict):
req = SentinelRecordRequest(**data)
await process_all_vehicle_animal_image(req)
print(f"完成全局分析任务: {req}")
async def sentinel_new_analysis(req: SentinelRecordRequest):
"""将分析请求发送到 RabbitMQ 队列(异步版)"""
connection = await aio_pika.connect_robust(
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{SENTINEL_VHOST}"
)
async with connection:
channel = await connection.channel()
# 声明队列,确保队列存在
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
message_body = json.dumps(req.model_dump())
message = aio_pika.Message(
body=message_body.encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 持久化
task = asyncio.create_task(
self.consume_queue(SENTINEL_ANALYSIS_ALL_QUEUE_NAME, _process)
)
self._consumer_tasks.append(task)
await channel.default_exchange.publish(message, routing_key=QUEUE_NAME)
# ---------------- 关闭连接 ----------------
async def close(self):
for task in self._consumer_tasks:
task.cancel()
if self._channel:
await self._channel.close()
if self._connection:
await self._connection.close()
async def sentinel_pull_analysis_async():
"""
从队列拉取分析任务并处理
process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑
"""
connection = await aio_pika.connect_robust(
f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{SENTINEL_VHOST}"
)
async with connection:
queue_name = QUEUE_NAME
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(queue_name, durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
data = json.loads(message.body)
req = SentinelRecordRequest(**data)
await process_vehicle_animal_image(req) # 处理
print(f"完成任务: {req}")
# ---------------- 全局单例 ----------------
mq_client = MQClient()