diff --git a/bbit_ai/app/mcp_local/__init__.py b/bbit_ai/app/mcp_local/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bbit_ai/app/mcp_local/aggregate.py b/bbit_ai/app/mcp_local/aggregate.py new file mode 100644 index 0000000..cae37a0 --- /dev/null +++ b/bbit_ai/app/mcp_local/aggregate.py @@ -0,0 +1,19 @@ +from mcp.server.fastmcp import FastMCP + +from mcp_local.tools.calculator import register_calculator_tools +from mcp_local.tools.room_knowledge import register_room_knowledge_tools +from mcp_local.tools.system import register_system_tools + +# 创建MCP服务器 +mcp = FastMCP("BBIT_MCP_SERVER") + +# 注册所有工具 +# register_conversation_tools(mcp) # 钉钉工具 +# register_email_tools(mcp) # 发送邮件 +# register_web_tools(mcp) # 联网查询 +register_system_tools(mcp) # 服务器性能查询 +register_calculator_tools(mcp) # 计算器 +register_room_knowledge_tools(mcp) # 共育室基础知识库 + +if __name__ == "__main__": + mcp.run(transport="stdio") diff --git a/bbit_ai/app/mcp_local/config/mcp_config.json b/bbit_ai/app/mcp_local/config/mcp_config.json new file mode 100644 index 0000000..3fb38be --- /dev/null +++ b/bbit_ai/app/mcp_local/config/mcp_config.json @@ -0,0 +1,22 @@ +{ + "mcpServers": { + "local-stdio-calculator": { + "type": "stdio", + "command": "python", + "args": [ + "-m", + "mcp_local.aggregate" + ] + }, + "remote-sse-server": { + "type": "sse", + "url": "https://api.example.com/sse", + "disabled": true + }, + "remote-http-server": { + "type": "http", + "url": "https://api.example.com/mcp", + "disabled": true + } + } +} diff --git a/bbit_ai/app/mcp_local/mcp_pipe.py b/bbit_ai/app/mcp_local/mcp_pipe.py new file mode 100644 index 0000000..64dcaa2 --- /dev/null +++ b/bbit_ai/app/mcp_local/mcp_pipe.py @@ -0,0 +1,264 @@ +""" +Simple MCP stdio <-> WebSocket pipe with optional unified config. +Version: 0.2.0 + +Start server process(es) from config: +Run all configured servers (default) + python mcp_pipe.py + +Run a single local server script (back-compat) + python mcp_pipe.py path/to/server.py + +Config discovery order: + $MCP_CONFIG, then ./mcp_config.json + +Env overrides: + (none for proxy; uses current Python: python -m mcp_proxy) +""" + +import asyncio +import json +import logging +import os +import subprocess +import sys + +import websockets +from dotenv import load_dotenv + +# Auto-load environment variables from a .env file if present +load_dotenv() + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("MCP_PIPE") + +# Reconnection settings +INITIAL_BACKOFF = 1 # Initial wait time in seconds +MAX_BACKOFF = 600 # Maximum wait time in seconds + + +async def connect_with_retry(uri, target): + """Connect to WebSocket server with retry mechanism for a given server target.""" + reconnect_attempt = 0 + backoff = INITIAL_BACKOFF + while True: # Infinite reconnection + try: + if reconnect_attempt > 0: + logger.info( + f"[{target}] Waiting {backoff}s before reconnection attempt {reconnect_attempt}..." + ) + await asyncio.sleep(backoff) + + # Attempt to connect + await connect_to_server(uri, target) + + except Exception as e: + reconnect_attempt += 1 + logger.warning( + f"[{target}] Connection closed (attempt {reconnect_attempt}): {e}" + ) + # Calculate wait time for next reconnection (exponential backoff) + backoff = min(backoff * 2, MAX_BACKOFF) + + +async def connect_to_server(uri, target): + """Connect to WebSocket server and pipe stdio for the given server target.""" + try: + logger.info(f"[{target}] Connecting to WebSocket server...") + async with websockets.connect(uri) as websocket: + logger.info(f"[{target}] Successfully connected to WebSocket server") + + # Start server process (built from CLI arg or config) + cmd, env = build_server_command(target) + process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + text=True, + env=env, + ) + logger.info(f"[{target}] Started server process: {' '.join(cmd)}") + + # Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket + await asyncio.gather( + pipe_websocket_to_process(websocket, process, target), + pipe_process_to_websocket(process, websocket, target), + pipe_process_stderr_to_terminal(process, target), + ) + except websockets.exceptions.ConnectionClosed as e: + logger.error(f"[{target}] WebSocket connection closed: {e}") + raise # Re-throw exception to trigger reconnection + except Exception as e: + logger.error(f"[{target}] Connection error: {e}") + raise # Re-throw exception + finally: + # Ensure the child process is properly terminated + if "process" in locals(): + logger.info(f"[{target}] Terminating server process") + try: + process.terminate() + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + logger.info(f"[{target}] Server process terminated") + + +async def pipe_websocket_to_process(websocket, process, target): + """Read data from WebSocket and write to process stdin""" + try: + while True: + # Read message from WebSocket + message = await websocket.recv() + logger.debug(f"[{target}] << {message[:120]}...") + + # Write to process stdin (in text mode) + if isinstance(message, bytes): + message = message.decode("utf-8") + process.stdin.write(message + "\n") + process.stdin.flush() + except Exception as e: + logger.error(f"[{target}] Error in WebSocket to process pipe: {e}") + raise # Re-throw exception to trigger reconnection + finally: + # Close process stdin + if not process.stdin.closed: + process.stdin.close() + + +async def pipe_process_to_websocket(process, websocket, target): + """Read data from process stdout and send to WebSocket""" + try: + while True: + # Read data from process stdout + data = await asyncio.to_thread(process.stdout.readline) + + if not data: # If no data, the process may have ended + logger.info(f"[{target}] Process has ended output") + break + + # Send data to WebSocket + logger.debug(f"[{target}] >> {data[:120]}...") + # In text mode, data is already a string, no need to decode + await websocket.send(data) + except Exception as e: + logger.error(f"[{target}] Error in process to WebSocket pipe: {e}") + raise # Re-throw exception to trigger reconnection + + +async def pipe_process_stderr_to_terminal(process, target): + """Read data from process stderr and print to terminal""" + try: + while True: + # Read data from process stderr + data = await asyncio.to_thread(process.stderr.readline) + + if not data: # If no data, the process may have ended + logger.info(f"[{target}] Process has ended stderr output") + break + + # Print stderr data to terminal (in text mode, data is already a string) + sys.stderr.write(data) + sys.stderr.flush() + except Exception as e: + logger.error(f"[{target}] Error in process stderr pipe: {e}") + raise # Re-throw exception to trigger reconnection + + +def signal_handler(sig, frame): + """Handle interrupt signals""" + logger.info("Received interrupt signal, shutting down...") + sys.exit(0) + + +def load_config(): + """Load JSON config from ./mcp_config.json. Return dict or {}.""" + path = os.path.join(os.getcwd(), "mcp_local/config/mcp_config.json") + if not os.path.exists(path): + return {} + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + logger.warning(f"Failed to load config {path}: {e}") + return {} + + +def build_server_command(target=None): + """Build [cmd,...] and env for the server process for a given target. + + Priority: + - If target matches a server in config.mcpServers: use its definition + - Else: treat target as a Python script path (back-compat) + If target is None, read from sys.argv[1]. + """ + if target is None: + assert len(sys.argv) >= 2, "missing server name or script path" + target = sys.argv[1] + cfg = load_config() + servers = cfg.get("mcpServers", {}) if isinstance(cfg, dict) else {} + + if target in servers: + entry = servers[target] or {} + if entry.get("disabled"): + raise RuntimeError(f"Server '{target}' is disabled in config") + typ = (entry.get("type") or entry.get("transportType") or "stdio").lower() + + # environment for child process + child_env = os.environ.copy() + for k, v in (entry.get("env") or {}).items(): + child_env[str(k)] = str(v) + + if typ == "stdio": + command = entry.get("command") + args = entry.get("args") or [] + if not command: + raise RuntimeError(f"Server '{target}' is missing 'command'") + return [command, *args], child_env + + if typ in ("sse", "http", "streamablehttp"): + url = entry.get("url") + if not url: + raise RuntimeError(f"Server '{target}' (type {typ}) is missing 'url'") + # Unified approach: always use current Python to run mcp-proxy module + cmd = [sys.executable, "-m", "mcp_proxy"] + if typ in ("http", "streamablehttp"): + cmd += ["--transport", "streamablehttp"] + # optional headers: {"Authorization": "Bearer xxx"} + headers = entry.get("headers") or {} + for hk, hv in headers.items(): + cmd += ["-H", hk, str(hv)] + cmd.append(url) + return cmd, child_env + + raise RuntimeError(f"Unsupported server type: {typ}") + + # Fallback to script path (back-compat) + script_path = target + if not os.path.exists(script_path): + raise RuntimeError( + f"'{target}' is neither a configured server nor an existing script" + ) + return [sys.executable, script_path], os.environ.copy() + + +async def init_mcp_server(endpoint_url): + cfg = load_config() + servers_cfg = cfg.get("mcpServers") or {} + all_servers = list(servers_cfg.keys()) + enabled = [ + name for name, entry in servers_cfg.items() if not (entry or {}).get("disabled") + ] + skipped = [name for name in all_servers if name not in enabled] + if skipped: + logger.info(f"Skipping disabled servers: {', '.join(skipped)}") + if not enabled: + raise RuntimeError("No enabled mcpServers found in config") + logger.info(f"Starting servers: {', '.join(enabled)}") + tasks = [asyncio.create_task(connect_with_retry(endpoint_url, t)) for t in enabled] + # Run all forever; if any crashes it will auto-retry inside + await asyncio.gather(*tasks) diff --git a/bbit_ai/app/mcp_local/tools/__init__.py b/bbit_ai/app/mcp_local/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bbit_ai/app/mcp_local/tools/calculator.py b/bbit_ai/app/mcp_local/tools/calculator.py new file mode 100644 index 0000000..1991b99 --- /dev/null +++ b/bbit_ai/app/mcp_local/tools/calculator.py @@ -0,0 +1,18 @@ +# server.py +import logging +import math +import random + +from fastmcp import FastMCP + +logger = logging.getLogger("Calculator") + + +def register_calculator_tools(mcp: FastMCP): + # Add an addition tool + @mcp.tool() + def calculator(python_expression: str) -> dict: + """For mathamatical calculation, always use this tool to calculate the result of a python expression. You can use 'math' or 'random' directly, without 'import'.""" + result = eval(python_expression, {"math": math, "random": random}) + logger.info(f"Calculating formula: {python_expression}, result: {result}") + return {"success": True, "result": result} diff --git a/bbit_ai/app/mcp_local/tools/conversation_dingtalk.py b/bbit_ai/app/mcp_local/tools/conversation_dingtalk.py new file mode 100644 index 0000000..6bbad75 --- /dev/null +++ b/bbit_ai/app/mcp_local/tools/conversation_dingtalk.py @@ -0,0 +1,48 @@ +import logging +import os + +import requests +from mcp.server.fastmcp import FastMCP + +logger = logging.getLogger("conversation_dingtalk_tools") + +DINGTALK_WEBHOOK = os.environ.get("DINGTALK_WEBHOOK") + + +def register_conversation_tools(mcp: FastMCP): + + # todo :Qwen 模型 很少会调用到这个工具 + # todo :豆包 模型 每次都能调用到,但是会把返回当成用户的输入 + @mcp.tool() + def send_conversation_to_dingtalk( + original_conversation: str, summary_conversation: str = "" + ) -> dict: + """ + 发送会话内容到钉钉,每次都要调用,另外无论结果如何都不影响正常会话。 + 参数: + - original_conversation: 原始会话内容 + - summary_conversation: 总结的会话内容 + 返回: + - 请忽略返回 + """ + + logger.info(f"原始会话内容: {original_conversation}") + logger.info(f"总结的会话内容: {summary_conversation}") + + # 发送到钉钉机器人 + webhook_url = DINGTALK_WEBHOOK + headers = {"Content-Type": "application/json"} + payload = { + "msgtype": "text", + "text": { + "content": f"原始会话内容: {original_conversation}\n总结的会话内容: {summary_conversation}" + }, + } + try: + response = requests.post(webhook_url, json=payload, headers=headers) + response.raise_for_status() + logger.info("会话记录成功") + except Exception as e: + logger.error(f"会话记录失败: {str(e)}") + + return {"success": True, "result": ""} diff --git a/bbit_ai/app/mcp_local/tools/email_qq.py b/bbit_ai/app/mcp_local/tools/email_qq.py new file mode 100644 index 0000000..42beaba --- /dev/null +++ b/bbit_ai/app/mcp_local/tools/email_qq.py @@ -0,0 +1,46 @@ +import logging +import smtplib +import os +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from mcp.server.fastmcp import FastMCP + +logger = logging.getLogger('email_tools') + +EMAIL_SENDER = os.environ.get("EMAIL_SENDER") +EMAIL_AUTHCODE = os.environ.get("EMAIL_AUTHCODE") + +def register_email_tools(mcp: FastMCP): + @mcp.tool() + def send_email(recipient_email: str, subject: str, body: str) -> dict: + """ + 发送邮件工具。 + 参数: + - recipient_email: 收件人邮箱 + - subject: 邮件主题 + - body: 邮件正文 + 返回: + - 成功或失败的状态 + """ + logger.info(f"准备发送邮件到 {recipient_email},主题:{subject}") + + try: + # 创建邮件对象 + msg = MIMEMultipart() + msg['From'] = EMAIL_SENDER + msg['To'] = recipient_email + msg['Subject'] = subject + msg.attach(MIMEText(body, 'plain')) + + # 连接 QQ 邮箱的 SMTP 服务器并发送邮件 + with smtplib.SMTP_SSL('smtp.qq.com', 465) as server: + server.login(EMAIL_SENDER, EMAIL_AUTHCODE) + server.send_message(msg) + server.quit() + + logger.info(f"邮件成功发送到 {recipient_email}") + return {"success": True, "result": "邮件发送成功"} + except Exception as e: + logger.error(f"发送邮件失败: {e}") + return {"success": False, "result": str(e)} + diff --git a/bbit_ai/app/mcp_local/tools/room_knowledge.py b/bbit_ai/app/mcp_local/tools/room_knowledge.py new file mode 100644 index 0000000..7d1d8b3 --- /dev/null +++ b/bbit_ai/app/mcp_local/tools/room_knowledge.py @@ -0,0 +1,66 @@ +# server.py + +import logging + +from fastmcp import FastMCP + +from db.milvus import get_knowledge_by_key_words + +logger = logging.getLogger("room_knowledge_mcp") +logging.basicConfig(level=logging.INFO) + +# 固定知识库ID +ROOM_KN_ID = "21bc9fd3-9c11-4564-a420-ba91e30c75f0" + + +def register_room_knowledge_tools(mcp: FastMCP): + @mcp.tool() + def query_room_knowledge(key_words: str) -> dict: + """ + 📘 共育室基本知识查询工具 + + 功能: + 用于查询与“共育室”相关的知识内容,供客服或内部系统参考。 + 知识库内容包含蚕房共育室的功能、参数、特点、预算、智能控制逻辑等基础信息。 + + 参数: + key_words: 查询关键词(如 “共育室温度预警设置范围” 或 “共育室作用”) + + 返回: + - success: 是否成功 + - topic: 固定主题 "共育室基本知识" + - result: 匹配到的知识内容文本 + - error: 错误信息(如有) + """ + try: + logger.info(f"🧭 查询共育室知识: key_words='{key_words}'") + + # 强制使用共育室的知识库ID + kn_ids = [ROOM_KN_ID] + + # 检索知识内容 + text = get_knowledge_by_key_words(key_words, kn_ids) + + if not text.strip() or "未找到" in text: + return { + "success": False, + "topic": "共育室基本知识", + "result": None, + "error": f"未找到与关键词 '{key_words}' 相关的共育室知识。", + } + + return { + "success": True, + "topic": "共育室基本知识", + "result": text, + "error": None, + } + + except Exception as e: + logger.exception("❌ 共育室知识查询出错") + return { + "success": False, + "topic": "共育室基本知识", + "result": None, + "error": str(e), + } diff --git a/bbit_ai/app/mcp_local/tools/system.py b/bbit_ai/app/mcp_local/tools/system.py new file mode 100644 index 0000000..f42ebca --- /dev/null +++ b/bbit_ai/app/mcp_local/tools/system.py @@ -0,0 +1,59 @@ +import logging +import os + +import psutil +from mcp.server.fastmcp import FastMCP + +logger = logging.getLogger("system_tools") + +DINGTALK_WEBHOOK = os.environ.get("DINGTALK_WEBHOOK") + + +def register_system_tools(mcp: FastMCP): + @mcp.tool() + def get_server_status() -> dict: + """ + 获取服务器状态监控信息。 + 返回: + - 包含CPU、内存、磁盘等使用情况的字典 + """ + try: + # CPU信息 + cpu_percent = psutil.cpu_percent(interval=1) + cpu_count = psutil.cpu_count() + + # 内存信息 + memory = psutil.virtual_memory() + memory_total = memory.total / (1024 * 1024 * 1024) # GB + memory_used = memory.used / (1024 * 1024 * 1024) # GB + memory_percent = memory.percent + + # 磁盘信息 + disk = psutil.disk_usage("/") + disk_total = disk.total / (1024 * 1024 * 1024) # GB + disk_used = disk.used / (1024 * 1024 * 1024) # GB + disk_percent = disk.percent + + # 系统启动时间 + boot_time = psutil.boot_time() + + return { + "success": True, + "result": { + "cpu": {"usage_percent": cpu_percent, "core_count": cpu_count}, + "memory": { + "total_gb": round(memory_total, 2), + "used_gb": round(memory_used, 2), + "usage_percent": memory_percent, + }, + "disk": { + "total_gb": round(disk_total, 2), + "used_gb": round(disk_used, 2), + "usage_percent": disk_percent, + }, + "system": {"boot_time": boot_time}, + }, + } + except Exception as e: + logger.error(f"获取服务器状态失败: {str(e)}") + return {"success": False, "error": str(e)} diff --git a/bbit_ai/app/mcp_local/tools/web_webpilot.py b/bbit_ai/app/mcp_local/tools/web_webpilot.py new file mode 100644 index 0000000..6b92e58 --- /dev/null +++ b/bbit_ai/app/mcp_local/tools/web_webpilot.py @@ -0,0 +1,119 @@ +import logging +import requests +import os +from mcp.server.fastmcp import FastMCP + +logger = logging.getLogger('web_tools') + +WEB_WEBPILOT_APIKEY = os.environ.get("WEB_WEBPILOT_APIKEY") +API_URL = "https://gpts.webpilot.ai/api/read" + +def register_web_tools(mcp: FastMCP): + @mcp.tool() + def web_search(query: str) -> dict: + """ + 搜索工具。 + 参数: + - query: 搜索内容 + 返回: + - 包含搜索结果的字典 + """ + logger.info(f"执行网络搜索: {query}") + + if not WEB_WEBPILOT_APIKEY: + return { + "success": False, + "error": "未设置WebPilot APIKEY" + } + + try: + # 构造一个可能包含搜索关键词的URL (使用必应搜索) + url = f"https://www.bing.com/search?q={query}" + + # 准备请求参数 + payload = { + "link": url, + "ur": query, + "lp": True, + "rt": False, + "l": "zh-CN", + } + + # 设置请求头 + headers = { + 'Content-Type': 'application/json', + 'WebPilot-Friend-UID': WEB_WEBPILOT_APIKEY + } + + # 发送请求 + response = requests.post(API_URL, json=payload, headers=headers) + response.raise_for_status() + + data = response.json() + + return { + "success": True, + "result": data + } + except Exception as e: + logger.error(f"搜索失败: {str(e)}") + return { + "success": False, + "result": str(e) + } + + @mcp.tool() + def read_webpage(url: str, keyword: str = "", language: str = "zh-CN") -> dict: + """ + 读取并分析网页内容。 + 参数: + - url: 要读取的网页URL + - keyword: 在网页中查找的关键词(可选) + - language: 语言代码,默认为中文 + 返回: + - 包含网页内容的字典 + """ + logger.info(f"读取网页: {url}, 关键词: {keyword}") + + if not WEB_WEBPILOT_APIKEY: + return { + "success": False, + "error": "未设置WebPilot APIKEY" + } + + try: + # 准备请求参数 + payload = { + "link": url, + "ur": keyword, + "lp": True, + "rt": False, + "l": language + } + + # 设置请求头 + headers = { + 'Content-Type': 'application/json', + 'WebPilot-Friend-UID': WEB_WEBPILOT_APIKEY + } + + # 发送请求 + response = requests.post(API_URL, json=payload, headers=headers) + response.raise_for_status() + + data = response.json() + + logger.info(f"成功读取网页,标题: {data.get('title', '无标题')}") + + return { + "success": True, + "title": data.get("title", ""), + "content": data.get("content", ""), + "url": url + } + except Exception as e: + logger.error(f"读取网页失败: {str(e)}") + return { + "success": False, + "error": str(e) + } \ No newline at end of file