AI后端程序增加对RabbitMQ的使用

This commit is contained in:
BBIT-Kai
2025-10-09 14:11:37 +08:00
parent 3fb43c09f3
commit 4d38f5483e
12 changed files with 175 additions and 134 deletions
+6
View File
@@ -6,9 +6,12 @@ from routers.Chat import chatRouter
from routers.Datasource import reportDataRouter
from routers.F8 import f8Router
from routers.Knowledge import knowledgeRouter
from routers.RabbitMQ import rqRouter
from routers.Report import reportRouter
from routers.Service import serviceRouter
from routers.Vision import visionRouter
from service.Analyze import mq_pull_analysis
from utils import MyUtils
app = FastAPI(title="BBIT_AI")
@@ -34,9 +37,12 @@ routers = [
serviceRouter,
botRouter,
visionRouter,
rqRouter,
]
for r in routers:
app.include_router(r, prefix="/llm", tags=["llm"])
app.include_router(f8Router, prefix="/f8", tags=["f8"])
MyUtils.async_new_task(mq_pull_analysis)
+5
View File
@@ -0,0 +1,5 @@
RABBIT_HOST = "10.10.10.9"
RABBIT_VHOST = "/bbit_ai"
RABBIT_USER = "bbit_ai"
RABBIT_PASSWORD = "123456"
QUEUE_NAME = "analysis_queue"
+6
View File
@@ -0,0 +1,6 @@
from pydantic import BaseModel
# 请求
class AnalysisRequest(BaseModel):
data: str # 你可以根据需要扩展字段
+12
View File
@@ -0,0 +1,12 @@
from fastapi import APIRouter
from models.AnalysisRequest import AnalysisRequest
from service.Analyze import mq_new_analysis
rqRouter = APIRouter()
@rqRouter.post("/analyze")
def send_analysis_request(req: AnalysisRequest):
mq_new_analysis(req)
return {"status": "queued"}
+32 -10
View File
@@ -1,22 +1,27 @@
from models.ChatRequest import ChatRequest
from models.BaseResponse import BaseResponse
import uuid
import db.postgres as pg
import uuid
from fastapi import APIRouter, Depends
from uuid import UUID
from fastapi import APIRouter, Depends
import db.postgres as pg
from config.security import get_user_id_from_token
from models.BaseResponse import BaseResponse
from models.ChatRequest import ChatRequest
serviceRouter = APIRouter()
from llm.titleChain import get_title
from agent.serviceAgent import get_service_agent_reply
from llm.memLLM import take_memory
import utils.MyUtils as utils
# 对话列表
@serviceRouter.get("/sessionsForService")
def getSessions(user_id: UUID = Depends(get_user_id_from_token)):
if not user_id:
return {"error": "userId is required"}
return BaseResponse(data=pg.get_sessions(user_id,'service'))
return BaseResponse(data=pg.get_sessions(user_id, "service"))
# 对话
@serviceRouter.post("/chatForService")
@@ -30,7 +35,7 @@ def chat(req: ChatRequest, user_id: UUID = Depends(get_user_id_from_token)):
if not req.sessionId:
isNewSession = True
req.sessionId = str(uuid.uuid4())
pg.insert_session(user_id,req.aiId, req.sessionId, sessionName, "service")
pg.insert_session(user_id, req.aiId, req.sessionId, sessionName, "service")
else:
isNewSession = False
pg.update_session_updated_at(req.sessionId)
@@ -38,10 +43,27 @@ def chat(req: ChatRequest, user_id: UUID = Depends(get_user_id_from_token)):
# 插入用户消息
pg.insert_message(req.sessionId, False, req.userInput)
answer = get_service_agent_reply(aiId=req.aiId,history=pg.get_history_with_time(req.sessionId,6), userInput= req.userInput,kn_bases=pg.get_ai_available_kn_bases(req.aiId))
answer = get_service_agent_reply(
aiId=req.aiId,
history=pg.get_history_with_time(req.sessionId, 6),
userInput=req.userInput,
kn_bases=pg.get_ai_available_kn_bases(req.aiId),
)
# 插入 AI 回复
pg.insert_message(req.sessionId, True, answer)
# 异步执行:记忆判断
utils.async_db_task(take_memory,req.aiId,req.sessionId,user_id,)
return BaseResponse(data={"sessionName":sessionName,"isNewSession":isNewSession,"content":answer,"sessionId": req.sessionId})
utils.async_new_task(
take_memory,
req.aiId,
req.sessionId,
user_id,
)
return BaseResponse(
data={
"sessionName": sessionName,
"isNewSession": isNewSession,
"content": answer,
"sessionId": req.sessionId,
}
)
+69
View File
@@ -0,0 +1,69 @@
# consumer.py
import json
import time
import pika
from config.rabbitMQ import *
from models.AnalysisRequest import AnalysisRequest
def mq_new_analysis(req: AnalysisRequest):
"""将分析请求发送到 RabbitMQ 队列"""
credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASSWORD)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=RABBIT_HOST, virtual_host=RABBIT_VHOST, credentials=credentials
)
)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
message = json.dumps(req.dict())
try:
channel.basic_publish(
exchange="",
routing_key=QUEUE_NAME,
body=message,
properties=pika.BasicProperties(delivery_mode=2), # 持久化消息
)
finally:
channel.close()
connection.close()
def mq_pull_analysis():
"""
从队列拉取分析任务并处理
process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑
"""
credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASSWORD)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=RABBIT_HOST, virtual_host=RABBIT_VHOST, credentials=credentials
)
)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_qos(prefetch_count=1) # 保证一次只处理一条消息
def callback(ch, method, properties, body):
try:
data = json.loads(body)
req = AnalysisRequest(**data)
print(f"收到任务: {req}")
time.sleep(5)
print(f"完成任务: {req}")
except Exception as e:
print(f"处理任务出错: {e}")
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback)
print("开启队列消费者")
print("等待分析任务...")
try:
channel.start_consuming()
finally:
channel.close()
connection.close()
View File
+5 -10
View File
@@ -1,24 +1,19 @@
import asyncio
import threading
from datetime import datetime
import pytz
# 后台操作
def async_db_task(func, *args, **kwargs):
"""将数据库操作放到后台线程执行"""
def async_new_task(func, *args, **kwargs):
threading.Thread(target=func, args=args, kwargs=kwargs, daemon=True).start()
import asyncio
async def async_task(func, *args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)
from datetime import datetime
import pytz
def format_datetime(dt: datetime, tz="Asia/Shanghai"):
if dt.tzinfo is None:
dt = pytz.UTC.localize(dt)
+1
View File
@@ -20,3 +20,4 @@ minio==7.2.16
pyzxing==1.1.1
Pillow==11.3.0
python-multipart==0.0.20
pika==1.3.2
-2
View File
@@ -104,6 +104,4 @@ dependencies {
// implementation("ai.koog:koog-agents:0.3.0")
// MinIO OSS
implementation("io.minio:minio:8.5.17")
// RocketMQ
implementation("org.apache.rocketmq:rocketmq-client-java:5.0.8")
}
+13 -32
View File
@@ -1,8 +1,6 @@
# ICP(Intelligence Control Platform)
# 主干AI实验室
## 一、简介
主干AI实验室使用以下技术栈:
## 一、技术栈
- ### 部署
@@ -21,6 +19,7 @@
- 开发语言:**Java/Kotlin**
- **FastAPI**:实验室AI模块后端程序
- 开发语言:**Python**
- **RabbitMQ**:消息队列
- ### 数据库
@@ -41,8 +40,8 @@
公网域名:s1.ronsunny.cn
| 公网端口 | 内网端口 | docker端口 | docker服务/Host | 框架 | 功能作用 |
| -------- | -------- | ---------- | ---------------- | ---------- | ------------------------------------------------------------ |
| 公网端口 | 内网端口 | docker端口 | docker服务/Host | 技术栈 | 功能作用 |
| -------- | -------- | ---------- | --------------- | ---------- | ----------------------------------------------- |
| | | 8090 | ce_vue | Vue | vue前端服务 |
| | 8089 | | | Ktor | 实验室业务后端 |
| | | 13011 | ce_pybackend | FastAPI | 实验室AI后端 |
@@ -50,38 +49,20 @@
| | 6379 | 6379 | ce_redis | Redis | Radis数据库 |
| | | 2379 | ce_etcd | Etcd | EtcdKey-Value 存储,给ce_milvus使用 |
| **9000** | 9000 | 9000 | ce_minio | MinIO | MinIO数据访问 |
| | 9001 | 9001 | ce_minio | MinIO | MinIOWeb控制端 |
| | 9001 | 9001 | ce_minio | MinIO | MinIO**Web可视化**控制端 |
| | 19530 | 19530 | ce_milvus | Milvus | Minvus数据访问 |
| | 9091 | 9091 | ce_milvus | Milvus | MinvusWeb控制端,需要加/webui |
| | 3000 | 3000 | ce_attu | Attu | AttuMinvus的**可视化**控制 |
| | 9091 | 9091 | ce_milvus | Milvus | Minvus**Web控制端**,需要加/webui |
| | 3000 | 3000 | ce_attu | Attu | Attu**Minvus的可视化**控制 |
| **8090** | 8090 | 8090 | ce_kong | Kong | Kong网关 |
| | 8001 | 8001 | ce_kong | Kong | Kong Admin API |
| | 8002 | 8002 | ce_kong | Kong | Kong **可视化**管理界面 |
| | 8444 | 8444 | ce_kong | Kong | Kong Admin API 的 HTTPS 端口 |
| | 8088 | | | ws-scrcpy | Android远程框架 |
| | 9876 | 9876 | ce_rmq_name_srv | RocketMQ | NameServer |
| | 8080 | 8080 | ce_rmq_proxy | RocketMQ | RocketMQ Proxy 的 **消息发送/消费 REST API 或 TCP 代理端口** |
| | 8081 | 8081 | ce_rmq_proxy | RocketMQ | RocketMQ Proxy 的 **管理/监控接口** |
| | 8082 | 8082 | ce_rmq_dashboard | RocketMQ | RocketMQ Dashboard **Web UI 端口** |
| | 10909 | 10909 | ce_rmq_broker_a | RocketMQ | broker 内部默认端口 |
| | 10911 | 10911 | ce_rmq_broker_a | RocketMQ | Consumer 拉取消息的端口 |
| | 10912 | 10912 | ce_rmq_broker_a | RocketMQ | Broker 之间的集群复制/同步 |
| | 10929 | 10909 | ce_rmq_broker_b | RocketMQ | broker 内部默认端口 |
| | 10931 | 10911 | ce_rmq_broker_b | RocketMQ | Consumer 拉取消息的端口 |
| | 10932 | 10912 | ce_rmq_broker_b | RocketMQ | Broker 之间的集群复制/同步 |
| 8088 | | | | | 建议后续关闭,原Android远程框架,已由网关控制 |
| 8089 | | | | | 建议后续关闭,原Ktor后端服务,已由网关控制 |
| 13011 | | | | | 已空闲:原FastAPI后端服务,已由网关控制 |
| 服务 | 端口 | 用途 |
| ---------- | ----- | ----------------------------------- |
| NameServer | 9876 | 注册/查询 Broker 地址,服务发现 |
| Broker | 10909 | 内部集群通信(心跳、同步) |
| | 10911 | 外部客户端通信(Producer/Consumer |
| | 10912 | HA 主从同步(可选) |
| Proxy | 8080 | HTTP/REST 客户端访问 Broker |
| | 8081 | Proxy 管理/状态监控 |
| Dashboard | 8082 | Web UI 可视化监控 |
| | 5672 | 5672 | ce_rabbitmq | RabbitMQ | RabbitMQ 客户端连接端口 |
| | 15672 | 15672 | ce_rabbitmq | RabbitMQ | RabbitMQ **管理界面**端口 |
| 8088 | | | | | 建议后续关闭,原Android远程框架,现已由网关控制 |
| 8089 | | | | | 建议后续关闭,原Ktor后端服务,现已由网关控制 |
| 13011 | | | | | 已空闲:原FastAPI后端服务,现已由网关控制 |
## 三、部署
+13 -67
View File
@@ -192,77 +192,22 @@ services:
- ce_network
restart: unless-stopped
# ---------- RocketMQ ----------
namesrv:
image: apache/rocketmq:5.3.3
container_name: ce_rmq_name_srv
ports:
- 9876:9876
networks:
- ce_network
command: sh mqnamesrv
broker1:
image: apache/rocketmq:5.3.3
container_name: ce_rmq_broker_a
ports:
- 10909:10909
- 10911:10911
- 10912:10912
# ---------- RabbitMQ ----------
rabbitmq:
image: rabbitmq:4.2-rc-management-alpine
container_name: ce_rabbitmq
restart: always
environment:
- NAMESRV_ADDR=ce_rmq_name_srv:9876
depends_on:
- namesrv
networks:
- ce_network
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: 123456
ports:
- "5672:5672"
- "15672:15672"
volumes:
- ./rocketmq_config/broker1.conf:/opt/broker.conf
command: sh mqbroker -c /opt/broker.conf
broker2:
image: apache/rocketmq:5.3.3
container_name: ce_rmq_broker_b
links:
- namesrv
ports:
- 10929:10909
- 10931:10911
- 10932:10912
environment:
- NAMESRV_ADDR=ce_rmq_name_srv:9876 # NameServer 地址
volumes:
- ./rocketmq_config/broker2.conf:/opt/broker.conf
command: sh mqbroker -c /opt/broker.conf
networks:
- ce_network
proxy:
image: apache/rocketmq:5.3.3
container_name: ce_rmq_proxy
networks:
- ce_network
depends_on:
- broker1
- broker2
- namesrv
ports:
- 8080:8080
- 8081:8081
restart: on-failure
environment:
- NAMESRV_ADDR=ce_rmq_broker:9876
command: sh mqproxy
rocketmq-dashboard:
image: apacherocketmq/rocketmq-dashboard:2.1.0
container_name: ce_rmq_dashboard
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=ce_rmq_name_srv:9876
ports:
- "8082:8082"
restart: unless-stopped
depends_on:
- broker1
- broker2
- namesrv
- rabbitmq_data:/var/lib/rabbitmq
networks:
- ce_network
# ---------- 数据卷 ----------
volumes:
postgres_data:
@@ -278,6 +223,7 @@ volumes:
driver_opts:
type: tmpfs
device: tmpfs
rabbitmq_data:
# ---------- 网络 ----------
networks:
ce_network: