diff --git a/bbit_ai/app/app.py b/bbit_ai/app/app.py index e0bf388..6170594 100644 --- a/bbit_ai/app/app.py +++ b/bbit_ai/app/app.py @@ -6,9 +6,12 @@ from routers.Chat import chatRouter from routers.Datasource import reportDataRouter from routers.F8 import f8Router from routers.Knowledge import knowledgeRouter +from routers.RabbitMQ import rqRouter from routers.Report import reportRouter from routers.Service import serviceRouter from routers.Vision import visionRouter +from service.Analyze import mq_pull_analysis +from utils import MyUtils app = FastAPI(title="BBIT_AI") @@ -34,9 +37,12 @@ routers = [ serviceRouter, botRouter, visionRouter, + rqRouter, ] for r in routers: app.include_router(r, prefix="/llm", tags=["llm"]) app.include_router(f8Router, prefix="/f8", tags=["f8"]) + +MyUtils.async_new_task(mq_pull_analysis) diff --git a/bbit_ai/app/config/rabbitMQ.py b/bbit_ai/app/config/rabbitMQ.py new file mode 100644 index 0000000..303cd8e --- /dev/null +++ b/bbit_ai/app/config/rabbitMQ.py @@ -0,0 +1,5 @@ +RABBIT_HOST = "10.10.10.9" +RABBIT_VHOST = "/bbit_ai" +RABBIT_USER = "bbit_ai" +RABBIT_PASSWORD = "123456" +QUEUE_NAME = "analysis_queue" diff --git a/bbit_ai/app/models/AnalysisRequest.py b/bbit_ai/app/models/AnalysisRequest.py new file mode 100644 index 0000000..998b90c --- /dev/null +++ b/bbit_ai/app/models/AnalysisRequest.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel + + +# 请求 +class AnalysisRequest(BaseModel): + data: str # 你可以根据需要扩展字段 diff --git a/bbit_ai/app/routers/RabbitMQ.py b/bbit_ai/app/routers/RabbitMQ.py new file mode 100644 index 0000000..4e2fc02 --- /dev/null +++ b/bbit_ai/app/routers/RabbitMQ.py @@ -0,0 +1,12 @@ +from fastapi import APIRouter + +from models.AnalysisRequest import AnalysisRequest +from service.Analyze import mq_new_analysis + +rqRouter = APIRouter() + + +@rqRouter.post("/analyze") +def send_analysis_request(req: AnalysisRequest): + mq_new_analysis(req) + return {"status": "queued"} diff --git a/bbit_ai/app/routers/Service.py b/bbit_ai/app/routers/Service.py index 39ac9a6..cbf5ee9 100644 --- a/bbit_ai/app/routers/Service.py +++ b/bbit_ai/app/routers/Service.py @@ -1,22 +1,27 @@ -from models.ChatRequest import ChatRequest -from models.BaseResponse import BaseResponse import uuid -import db.postgres as pg -import uuid -from fastapi import APIRouter, Depends from uuid import UUID + +from fastapi import APIRouter, Depends + +import db.postgres as pg from config.security import get_user_id_from_token +from models.BaseResponse import BaseResponse +from models.ChatRequest import ChatRequest + serviceRouter = APIRouter() from llm.titleChain import get_title from agent.serviceAgent import get_service_agent_reply from llm.memLLM import take_memory import utils.MyUtils as utils + + # 对话列表 @serviceRouter.get("/sessionsForService") def getSessions(user_id: UUID = Depends(get_user_id_from_token)): if not user_id: return {"error": "userId is required"} - return BaseResponse(data=pg.get_sessions(user_id,'service')) + return BaseResponse(data=pg.get_sessions(user_id, "service")) + # 对话 @serviceRouter.post("/chatForService") @@ -30,7 +35,7 @@ def chat(req: ChatRequest, user_id: UUID = Depends(get_user_id_from_token)): if not req.sessionId: isNewSession = True req.sessionId = str(uuid.uuid4()) - pg.insert_session(user_id,req.aiId, req.sessionId, sessionName, "service") + pg.insert_session(user_id, req.aiId, req.sessionId, sessionName, "service") else: isNewSession = False pg.update_session_updated_at(req.sessionId) @@ -38,10 +43,27 @@ def chat(req: ChatRequest, user_id: UUID = Depends(get_user_id_from_token)): # 插入用户消息 pg.insert_message(req.sessionId, False, req.userInput) - answer = get_service_agent_reply(aiId=req.aiId,history=pg.get_history_with_time(req.sessionId,6), userInput= req.userInput,kn_bases=pg.get_ai_available_kn_bases(req.aiId)) + answer = get_service_agent_reply( + aiId=req.aiId, + history=pg.get_history_with_time(req.sessionId, 6), + userInput=req.userInput, + kn_bases=pg.get_ai_available_kn_bases(req.aiId), + ) # 插入 AI 回复 pg.insert_message(req.sessionId, True, answer) # 异步执行:记忆判断 - utils.async_db_task(take_memory,req.aiId,req.sessionId,user_id,) - return BaseResponse(data={"sessionName":sessionName,"isNewSession":isNewSession,"content":answer,"sessionId": req.sessionId}) + utils.async_new_task( + take_memory, + req.aiId, + req.sessionId, + user_id, + ) + return BaseResponse( + data={ + "sessionName": sessionName, + "isNewSession": isNewSession, + "content": answer, + "sessionId": req.sessionId, + } + ) diff --git a/bbit_ai/app/service/Analyze.py b/bbit_ai/app/service/Analyze.py new file mode 100644 index 0000000..e4dfd17 --- /dev/null +++ b/bbit_ai/app/service/Analyze.py @@ -0,0 +1,69 @@ +# consumer.py +import json +import time + +import pika + +from config.rabbitMQ import * +from models.AnalysisRequest import AnalysisRequest + + +def mq_new_analysis(req: AnalysisRequest): + """将分析请求发送到 RabbitMQ 队列""" + credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASSWORD) + connection = pika.BlockingConnection( + pika.ConnectionParameters( + host=RABBIT_HOST, virtual_host=RABBIT_VHOST, credentials=credentials + ) + ) + channel = connection.channel() + channel.queue_declare(queue=QUEUE_NAME, durable=True) + + message = json.dumps(req.dict()) + try: + channel.basic_publish( + exchange="", + routing_key=QUEUE_NAME, + body=message, + properties=pika.BasicProperties(delivery_mode=2), # 持久化消息 + ) + finally: + channel.close() + connection.close() + + +def mq_pull_analysis(): + """ + 从队列拉取分析任务并处理 + process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑 + """ + credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASSWORD) + connection = pika.BlockingConnection( + pika.ConnectionParameters( + host=RABBIT_HOST, virtual_host=RABBIT_VHOST, credentials=credentials + ) + ) + channel = connection.channel() + channel.queue_declare(queue=QUEUE_NAME, durable=True) + channel.basic_qos(prefetch_count=1) # 保证一次只处理一条消息 + + def callback(ch, method, properties, body): + try: + data = json.loads(body) + req = AnalysisRequest(**data) + print(f"收到任务: {req}") + time.sleep(5) + print(f"完成任务: {req}") + except Exception as e: + print(f"处理任务出错: {e}") + finally: + ch.basic_ack(delivery_tag=method.delivery_tag) + + channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback) + print("开启队列消费者") + print("等待分析任务...") + try: + channel.start_consuming() + finally: + channel.close() + connection.close() diff --git a/bbit_ai/app/service/__init__.py b/bbit_ai/app/service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bbit_ai/app/utils/MyUtils.py b/bbit_ai/app/utils/MyUtils.py index 27ac1f7..b9b9248 100644 --- a/bbit_ai/app/utils/MyUtils.py +++ b/bbit_ai/app/utils/MyUtils.py @@ -1,24 +1,19 @@ +import asyncio import threading +from datetime import datetime + +import pytz # 后台操作 -def async_db_task(func, *args, **kwargs): - """将数据库操作放到后台线程执行""" +def async_new_task(func, *args, **kwargs): threading.Thread(target=func, args=args, kwargs=kwargs, daemon=True).start() -import asyncio - - async def async_task(func, *args, **kwargs): return await asyncio.to_thread(func, *args, **kwargs) -from datetime import datetime - -import pytz - - def format_datetime(dt: datetime, tz="Asia/Shanghai"): if dt.tzinfo is None: dt = pytz.UTC.localize(dt) diff --git a/bbit_ai/docker/requirements.txt b/bbit_ai/docker/requirements.txt index beca369..11f8f38 100644 --- a/bbit_ai/docker/requirements.txt +++ b/bbit_ai/docker/requirements.txt @@ -20,3 +20,4 @@ minio==7.2.16 pyzxing==1.1.1 Pillow==11.3.0 python-multipart==0.0.20 +pika==1.3.2 \ No newline at end of file diff --git a/ktor/build.gradle.kts b/ktor/build.gradle.kts index 9994bb0..be590dc 100644 --- a/ktor/build.gradle.kts +++ b/ktor/build.gradle.kts @@ -104,6 +104,4 @@ dependencies { // implementation("ai.koog:koog-agents:0.3.0") // MinIO OSS implementation("io.minio:minio:8.5.17") - // RocketMQ - implementation("org.apache.rocketmq:rocketmq-client-java:5.0.8") } diff --git a/readme.md b/readme.md index 33fc90f..0268e87 100644 --- a/readme.md +++ b/readme.md @@ -1,8 +1,6 @@ -# ICP(Intelligence Control Platform) +# 主干AI实验室 -## 一、简介 - -主干AI实验室使用以下技术栈: +## 一、技术栈 - ### 部署 @@ -21,6 +19,7 @@ - 开发语言:**Java/Kotlin** - **FastAPI**:实验室AI模块后端程序 - 开发语言:**Python** + - **RabbitMQ**:消息队列 - ### 数据库 @@ -41,47 +40,29 @@ 公网域名:s1.ronsunny.cn -| 公网端口 | 内网端口 | docker端口 | docker服务/Host | 框架 | 功能作用 | -| -------- | -------- | ---------- | ---------------- | ---------- | ------------------------------------------------------------ | -| | | 8090 | ce_vue | Vue | vue前端服务 | -| | 8089 | | | Ktor | 实验室业务后端 | -| | | 13011 | ce_pybackend | FastAPI | 实验室AI后端 | -| | 5432 | 5432 | ce_postgres | PostgreSQL | PostgreSQL数据库 | -| | 6379 | 6379 | ce_redis | Redis | Radis数据库 | -| | | 2379 | ce_etcd | Etcd | Etcd,Key-Value 存储,给ce_milvus使用 | -| **9000** | 9000 | 9000 | ce_minio | MinIO | MinIO数据访问 | -| | 9001 | 9001 | ce_minio | MinIO | MinIOWeb控制端 | -| | 19530 | 19530 | ce_milvus | Milvus | Minvus数据访问 | -| | 9091 | 9091 | ce_milvus | Milvus | MinvusWeb控制端,需要加/webui | -| | 3000 | 3000 | ce_attu | Attu | Attu,Minvus的**可视化**控制 | -| **8090** | 8090 | 8090 | ce_kong | Kong | Kong网关 | -| | 8001 | 8001 | ce_kong | Kong | Kong Admin API | -| | 8002 | 8002 | ce_kong | Kong | Kong **可视化**管理界面 | -| | 8444 | 8444 | ce_kong | Kong | Kong Admin API 的 HTTPS 端口 | -| | 8088 | | | ws-scrcpy | Android远程框架 | -| | 9876 | 9876 | ce_rmq_name_srv | RocketMQ | NameServer | -| | 8080 | 8080 | ce_rmq_proxy | RocketMQ | RocketMQ Proxy 的 **消息发送/消费 REST API 或 TCP 代理端口** | -| | 8081 | 8081 | ce_rmq_proxy | RocketMQ | RocketMQ Proxy 的 **管理/监控接口** | -| | 8082 | 8082 | ce_rmq_dashboard | RocketMQ | RocketMQ Dashboard **Web UI 端口** | -| | 10909 | 10909 | ce_rmq_broker_a | RocketMQ | broker 内部默认端口 | -| | 10911 | 10911 | ce_rmq_broker_a | RocketMQ | Consumer 拉取消息的端口 | -| | 10912 | 10912 | ce_rmq_broker_a | RocketMQ | Broker 之间的集群复制/同步 | -| | 10929 | 10909 | ce_rmq_broker_b | RocketMQ | broker 内部默认端口 | -| | 10931 | 10911 | ce_rmq_broker_b | RocketMQ | Consumer 拉取消息的端口 | -| | 10932 | 10912 | ce_rmq_broker_b | RocketMQ | Broker 之间的集群复制/同步 | -| 8088 | | | | | 建议后续关闭,原Android远程框架,已由网关控制 | -| 8089 | | | | | 建议后续关闭,原Ktor后端服务,已由网关控制 | -| 13011 | | | | | 已空闲:原FastAPI后端服务,已由网关控制 | - -| 服务 | 端口 | 用途 | -| ---------- | ----- | ----------------------------------- | -| NameServer | 9876 | 注册/查询 Broker 地址,服务发现 | -| Broker | 10909 | 内部集群通信(心跳、同步) | -| | 10911 | 外部客户端通信(Producer/Consumer) | -| | 10912 | HA 主从同步(可选) | -| Proxy | 8080 | HTTP/REST 客户端访问 Broker | -| | 8081 | Proxy 管理/状态监控 | -| Dashboard | 8082 | Web UI 可视化监控 | +| 公网端口 | 内网端口 | docker端口 | docker服务/Host | 技术栈 | 功能作用 | +| -------- | -------- | ---------- | --------------- | ---------- | ----------------------------------------------- | +| | | 8090 | ce_vue | Vue | vue前端服务 | +| | 8089 | | | Ktor | 实验室业务后端 | +| | | 13011 | ce_pybackend | FastAPI | 实验室AI后端 | +| | 5432 | 5432 | ce_postgres | PostgreSQL | PostgreSQL数据库 | +| | 6379 | 6379 | ce_redis | Redis | Radis数据库 | +| | | 2379 | ce_etcd | Etcd | Etcd,Key-Value 存储,给ce_milvus使用 | +| **9000** | 9000 | 9000 | ce_minio | MinIO | MinIO数据访问 | +| | 9001 | 9001 | ce_minio | MinIO | MinIO**Web可视化**控制端 | +| | 19530 | 19530 | ce_milvus | Milvus | Minvus数据访问 | +| | 9091 | 9091 | ce_milvus | Milvus | Minvus**Web控制端**,需要加/webui | +| | 3000 | 3000 | ce_attu | Attu | Attu,**Minvus的可视化**控制 | +| **8090** | 8090 | 8090 | ce_kong | Kong | Kong网关 | +| | 8001 | 8001 | ce_kong | Kong | Kong Admin API | +| | 8002 | 8002 | ce_kong | Kong | Kong **可视化**管理界面 | +| | 8444 | 8444 | ce_kong | Kong | Kong Admin API 的 HTTPS 端口 | +| | 8088 | | | ws-scrcpy | Android远程框架 | +| | 5672 | 5672 | ce_rabbitmq | RabbitMQ | RabbitMQ 客户端连接端口 | +| | 15672 | 15672 | ce_rabbitmq | RabbitMQ | RabbitMQ **管理界面**端口 | +| 8088 | | | | | 建议后续关闭,原Android远程框架,现已由网关控制 | +| 8089 | | | | | 建议后续关闭,原Ktor后端服务,现已由网关控制 | +| 13011 | | | | | 已空闲:原FastAPI后端服务,现已由网关控制 | ## 三、部署 diff --git a/server/docker/docker-compose.yaml b/server/docker/docker-compose.yaml index b4a1457..eb12ab0 100644 --- a/server/docker/docker-compose.yaml +++ b/server/docker/docker-compose.yaml @@ -192,77 +192,22 @@ services: - ce_network restart: unless-stopped -# ---------- RocketMQ ---------- - namesrv: - image: apache/rocketmq:5.3.3 - container_name: ce_rmq_name_srv - ports: - - 9876:9876 - networks: - - ce_network - command: sh mqnamesrv - broker1: - image: apache/rocketmq:5.3.3 - container_name: ce_rmq_broker_a - ports: - - 10909:10909 - - 10911:10911 - - 10912:10912 + # ---------- RabbitMQ ---------- + rabbitmq: + image: rabbitmq:4.2-rc-management-alpine + container_name: ce_rabbitmq + restart: always environment: - - NAMESRV_ADDR=ce_rmq_name_srv:9876 - depends_on: - - namesrv - networks: - - ce_network + RABBITMQ_DEFAULT_USER: admin + RABBITMQ_DEFAULT_PASS: 123456 + ports: + - "5672:5672" + - "15672:15672" volumes: - - ./rocketmq_config/broker1.conf:/opt/broker.conf - command: sh mqbroker -c /opt/broker.conf - broker2: - image: apache/rocketmq:5.3.3 - container_name: ce_rmq_broker_b - links: - - namesrv - ports: - - 10929:10909 - - 10931:10911 - - 10932:10912 - environment: - - NAMESRV_ADDR=ce_rmq_name_srv:9876 # NameServer 地址 - volumes: - - ./rocketmq_config/broker2.conf:/opt/broker.conf - command: sh mqbroker -c /opt/broker.conf - networks: - - ce_network - proxy: - image: apache/rocketmq:5.3.3 - container_name: ce_rmq_proxy - networks: - - ce_network - depends_on: - - broker1 - - broker2 - - namesrv - ports: - - 8080:8080 - - 8081:8081 - restart: on-failure - environment: - - NAMESRV_ADDR=ce_rmq_broker:9876 - command: sh mqproxy - rocketmq-dashboard: - image: apacherocketmq/rocketmq-dashboard:2.1.0 - container_name: ce_rmq_dashboard - environment: - - JAVA_OPTS=-Drocketmq.namesrv.addr=ce_rmq_name_srv:9876 - ports: - - "8082:8082" - restart: unless-stopped - depends_on: - - broker1 - - broker2 - - namesrv + - rabbitmq_data:/var/lib/rabbitmq networks: - ce_network + # ---------- 数据卷 ---------- volumes: postgres_data: @@ -278,6 +223,7 @@ volumes: driver_opts: type: tmpfs device: tmpfs + rabbitmq_data: # ---------- 网络 ---------- networks: ce_network: