diff --git a/bbit_ai/.idea/app.iml b/bbit_ai/.idea/app.iml index 6d713b3..0d050b9 100644 --- a/bbit_ai/.idea/app.iml +++ b/bbit_ai/.idea/app.iml @@ -2,6 +2,9 @@ + + + diff --git a/bbit_ai/app/app.py b/bbit_ai/app/app.py index 7c6a107..8b6d9a0 100644 --- a/bbit_ai/app/app.py +++ b/bbit_ai/app/app.py @@ -1,6 +1,10 @@ +import asyncio + from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +from uvicorn import Config, Server +from mcp_local.mcp_pipe import init_mcp_server from routers.Bot import botRouter from routers.Chat import chatRouter from routers.Datasource import reportDataRouter @@ -10,38 +14,61 @@ from routers.RabbitMQ import rqRouter from routers.Report import reportRouter from routers.Service import serviceRouter from routers.Vision import visionRouter -from service.Analyze import mq_pull_analysis -from utils import MyUtils +from service.Analyze import mq_pull_analysis_async -app = FastAPI(title="BBIT_AI") -origins = [ - "http://localhost:8090", # Vite dev 默认端口 - "https://ai.ronsunny.cn:8090", - "*", # ⚠️ 生产环境不要用 -] +async def ai_lab(): + app = FastAPI(title="BBIT_AI") -app.add_middleware( - CORSMiddleware, - allow_origins=origins, - allow_credentials=True, - allow_methods=["*"], # 必须包含 OPTIONS、GET 等 - allow_headers=["*"], -) -routers = [ - chatRouter, - reportRouter, - knowledgeRouter, - reportDataRouter, - serviceRouter, - botRouter, - visionRouter, - rqRouter, -] + origins = [ + "http://localhost:8090", # Vite dev 默认端口 + "https://ai.ronsunny.cn:8090", + "*", # ⚠️ 生产环境不要用 + ] -for r in routers: - app.include_router(r, prefix="/llm", tags=["llm"]) + app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], # 必须包含 OPTIONS、GET 等 + allow_headers=["*"], + ) + routers = [ + chatRouter, + reportRouter, + knowledgeRouter, + reportDataRouter, + serviceRouter, + botRouter, + visionRouter, + rqRouter, + ] + for r in routers: + app.include_router(r, prefix="/llm", tags=["llm"]) + app.include_router(publicRouter, prefix="/api/public", tags=["api"]) + config = Config(app=app, host="0.0.0.0", port=13011, log_level="info") + server = Server(config) + await server.serve() -app.include_router(publicRouter, prefix="/api/public", tags=["api"]) -MyUtils.async_new_task(mq_pull_analysis) +async def main(): + # 主干AI实验室FastAPI服务 + task_api = asyncio.create_task(ai_lab()) + + # MCP服务-ailab + # endpoint_url = "wss://ai.ronsunny.cn:8090/aimcp/mcp_endpoint/mcp/?token=TsSP9lBq6Oa1WMkachHoS2TtNt4GKV/Gli24pk5Rjpk%3D" + endpoint_url_ai_lab = "ws://ce_bot_mcp:8004/mcp_endpoint/mcp/?token=TsSP9lBq6Oa1WMkachHoS2TtNt4GKV/Gli24pk5Rjpk%3D" + task_mcp1 = asyncio.create_task(init_mcp_server(endpoint_url_ai_lab)) + + # MCP服务-ql + endpoint_url_ql = "wss://ai.ronsunny.cn:8090/aimcp/mcp_endpoint/mcp/?token=8ZmCzp7FzsbxwHOg2%2FvBQkxrC3QWJiI%2B4iTfouExinjcT8ZgLwQfFUtgcMInI7St" + task_mcp2 = asyncio.create_task(init_mcp_server(endpoint_url_ql)) + + # RabbitMQ服务 + task_mq = asyncio.create_task(mq_pull_analysis_async()) + + await asyncio.gather(task_api, task_mcp1, task_mcp2, task_mq) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/bbit_ai/app/config/ssDb.py b/bbit_ai/app/config/ssDb.py index 4ab36d2..5c4ae0e 100644 --- a/bbit_ai/app/config/ssDb.py +++ b/bbit_ai/app/config/ssDb.py @@ -1,25 +1,16 @@ -from langchain_community.utilities import SQLDatabase -from langchain_community.agent_toolkits import create_sql_agent -from langchain_community.chat_models.tongyi import ChatTongyi -# SQLAlchemy URI -uri = "mssql+pyodbc://f8_db_test:APN^QPr!K9@122.114.58.23/f8_db_test?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes&Encrypt=no" - -# 建立数据库对象 -ssDBLC = SQLDatabase.from_uri( - uri, - include_tables=["NONGHU_INFO","NONGHU_BLACKLIST",], - schema="dbo" # 显式指定 schema -) import logging import time from contextlib import contextmanager -from sqlalchemy import create_engine, text -from sqlalchemy.exc import OperationalError as SQLOperationalError from urllib.parse import quote_plus +from langchain_community.utilities import SQLDatabase +from sqlalchemy import create_engine +from sqlalchemy.exc import OperationalError as SQLOperationalError + logger = logging.getLogger("MSSQLPool") logger.setLevel(logging.INFO) + class MSSQLPool: """ SQL Server 连接池封装 @@ -73,10 +64,23 @@ class MSSQLPool: logger.error(f"SQL执行异常: {e}") raise raise SQLOperationalError("无法获取数据库连接,多次重试失败") - + + mssql_pool = MSSQLPool( - user="f8_db_test", - password="APN^QPr!K9", - host="122.114.58.23", - database="f8_db_test", + user="f8_read", + password="www.bbitcn.com", + host="f8.api.dev.bbitcn.cn", + database="f8_db_dev", +) + +# SQLAlchemy URI +uri = "mssql+pyodbc://f8_read:www.bbitcn.com@f8.api.dev.bbitcn.cn/f8_db_dev?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes&Encrypt=no" + +# 建立数据库对象 +ssDBLC = SQLDatabase.from_uri( + uri, + include_tables=[ + "NONGHU_INFO", + ], + schema="dbo", # 显式指定 schema ) diff --git a/bbit_ai/app/service/Analyze.py b/bbit_ai/app/service/Analyze.py index e3e9636..420b4c1 100644 --- a/bbit_ai/app/service/Analyze.py +++ b/bbit_ai/app/service/Analyze.py @@ -1,68 +1,52 @@ # consumer.py +import asyncio import json -import time -import pika +import aio_pika from config.rabbitMQ import * from models.AnalysisRequest import AnalysisRequest -def mq_new_analysis(req: AnalysisRequest): - """将分析请求发送到 RabbitMQ 队列""" - credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASSWORD) - connection = pika.BlockingConnection( - pika.ConnectionParameters( - host=RABBIT_HOST, virtual_host=RABBIT_VHOST, credentials=credentials - ) +async def mq_new_analysis(req: AnalysisRequest): + """将分析请求发送到 RabbitMQ 队列(异步版)""" + connection = await aio_pika.connect_robust( + f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}" ) - channel = connection.channel() - channel.queue_declare(queue=QUEUE_NAME, durable=True) - message = json.dumps(req.dict()) - try: - channel.basic_publish( - exchange="", - routing_key=QUEUE_NAME, - body=message, - properties=pika.BasicProperties(delivery_mode=2), # 持久化消息 + async with connection: + channel = await connection.channel() + # 声明队列,确保队列存在 + queue = await channel.declare_queue(QUEUE_NAME, durable=True) + + message_body = json.dumps(req.dict()) + message = aio_pika.Message( + body=message_body.encode(), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 持久化 ) - finally: - channel.close() - connection.close() + + await channel.default_exchange.publish(message, routing_key=QUEUE_NAME) -def mq_pull_analysis(): +async def mq_pull_analysis_async(): """ 从队列拉取分析任务并处理 process_func: 一个函数,接收 AnalysisRequest 对象处理分析逻辑 """ - credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASSWORD) - connection = pika.BlockingConnection( - pika.ConnectionParameters( - host=RABBIT_HOST, virtual_host=RABBIT_VHOST, credentials=credentials - ) + connection = await aio_pika.connect_robust( + f"amqp://{RABBIT_USER}:{RABBIT_PASSWORD}@{RABBIT_HOST}/{RABBIT_VHOST}" ) - channel = connection.channel() - channel.queue_declare(queue=QUEUE_NAME, durable=True) - channel.basic_qos(prefetch_count=1) # 保证一次只处理一条消息 + async with connection: + queue_name = QUEUE_NAME + channel = await connection.channel() + await channel.set_qos(prefetch_count=1) + queue = await channel.declare_queue(queue_name, durable=True) - def callback(ch, method, properties, body): - try: - data = json.loads(body) - req = AnalysisRequest(**data) - print(f"收到任务: {req}") - time.sleep(5) - print(f"完成任务: {req}") - except Exception as e: - print(f"处理任务出错: {e}") - finally: - ch.basic_ack(delivery_tag=method.delivery_tag) - - channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback) - print("开启队列消费者,等待分析任务...") - try: - channel.start_consuming() - finally: - channel.close() - connection.close() + async with queue.iterator() as queue_iter: + async for message in queue_iter: + async with message.process(): + data = json.loads(message.body) + req = AnalysisRequest(**data) + print(f"收到任务: {req}") + await asyncio.sleep(5) # 模拟处理 + print(f"完成任务: {req}")