仿生人MCP服务

This commit is contained in:
BBIT-Kai
2025-11-05 18:04:36 +08:00
parent 5f5eefd37b
commit 179604931d
11 changed files with 661 additions and 0 deletions
View File
+19
View File
@@ -0,0 +1,19 @@
from mcp.server.fastmcp import FastMCP
from mcp_local.tools.calculator import register_calculator_tools
from mcp_local.tools.room_knowledge import register_room_knowledge_tools
from mcp_local.tools.system import register_system_tools
# 创建MCP服务器
mcp = FastMCP("BBIT_MCP_SERVER")
# 注册所有工具
# register_conversation_tools(mcp) # 钉钉工具
# register_email_tools(mcp) # 发送邮件
# register_web_tools(mcp) # 联网查询
register_system_tools(mcp) # 服务器性能查询
register_calculator_tools(mcp) # 计算器
register_room_knowledge_tools(mcp) # 共育室基础知识库
if __name__ == "__main__":
mcp.run(transport="stdio")
@@ -0,0 +1,22 @@
{
"mcpServers": {
"local-stdio-calculator": {
"type": "stdio",
"command": "python",
"args": [
"-m",
"mcp_local.aggregate"
]
},
"remote-sse-server": {
"type": "sse",
"url": "https://api.example.com/sse",
"disabled": true
},
"remote-http-server": {
"type": "http",
"url": "https://api.example.com/mcp",
"disabled": true
}
}
}
+264
View File
@@ -0,0 +1,264 @@
"""
Simple MCP stdio <-> WebSocket pipe with optional unified config.
Version: 0.2.0
Start server process(es) from config:
Run all configured servers (default)
python mcp_pipe.py
Run a single local server script (back-compat)
python mcp_pipe.py path/to/server.py
Config discovery order:
$MCP_CONFIG, then ./mcp_config.json
Env overrides:
(none for proxy; uses current Python: python -m mcp_proxy)
"""
import asyncio
import json
import logging
import os
import subprocess
import sys
import websockets
from dotenv import load_dotenv
# Auto-load environment variables from a .env file if present
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("MCP_PIPE")
# Reconnection settings
INITIAL_BACKOFF = 1 # Initial wait time in seconds
MAX_BACKOFF = 600 # Maximum wait time in seconds
async def connect_with_retry(uri, target):
"""Connect to WebSocket server with retry mechanism for a given server target."""
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
while True: # Infinite reconnection
try:
if reconnect_attempt > 0:
logger.info(
f"[{target}] Waiting {backoff}s before reconnection attempt {reconnect_attempt}..."
)
await asyncio.sleep(backoff)
# Attempt to connect
await connect_to_server(uri, target)
except Exception as e:
reconnect_attempt += 1
logger.warning(
f"[{target}] Connection closed (attempt {reconnect_attempt}): {e}"
)
# Calculate wait time for next reconnection (exponential backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
async def connect_to_server(uri, target):
"""Connect to WebSocket server and pipe stdio for the given server target."""
try:
logger.info(f"[{target}] Connecting to WebSocket server...")
async with websockets.connect(uri) as websocket:
logger.info(f"[{target}] Successfully connected to WebSocket server")
# Start server process (built from CLI arg or config)
cmd, env = build_server_command(target)
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
text=True,
env=env,
)
logger.info(f"[{target}] Started server process: {' '.join(cmd)}")
# Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
await asyncio.gather(
pipe_websocket_to_process(websocket, process, target),
pipe_process_to_websocket(process, websocket, target),
pipe_process_stderr_to_terminal(process, target),
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"[{target}] WebSocket connection closed: {e}")
raise # Re-throw exception to trigger reconnection
except Exception as e:
logger.error(f"[{target}] Connection error: {e}")
raise # Re-throw exception
finally:
# Ensure the child process is properly terminated
if "process" in locals():
logger.info(f"[{target}] Terminating server process")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
logger.info(f"[{target}] Server process terminated")
async def pipe_websocket_to_process(websocket, process, target):
"""Read data from WebSocket and write to process stdin"""
try:
while True:
# Read message from WebSocket
message = await websocket.recv()
logger.debug(f"[{target}] << {message[:120]}...")
# Write to process stdin (in text mode)
if isinstance(message, bytes):
message = message.decode("utf-8")
process.stdin.write(message + "\n")
process.stdin.flush()
except Exception as e:
logger.error(f"[{target}] Error in WebSocket to process pipe: {e}")
raise # Re-throw exception to trigger reconnection
finally:
# Close process stdin
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket, target):
"""Read data from process stdout and send to WebSocket"""
try:
while True:
# Read data from process stdout
data = await asyncio.to_thread(process.stdout.readline)
if not data: # If no data, the process may have ended
logger.info(f"[{target}] Process has ended output")
break
# Send data to WebSocket
logger.debug(f"[{target}] >> {data[:120]}...")
# In text mode, data is already a string, no need to decode
await websocket.send(data)
except Exception as e:
logger.error(f"[{target}] Error in process to WebSocket pipe: {e}")
raise # Re-throw exception to trigger reconnection
async def pipe_process_stderr_to_terminal(process, target):
"""Read data from process stderr and print to terminal"""
try:
while True:
# Read data from process stderr
data = await asyncio.to_thread(process.stderr.readline)
if not data: # If no data, the process may have ended
logger.info(f"[{target}] Process has ended stderr output")
break
# Print stderr data to terminal (in text mode, data is already a string)
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"[{target}] Error in process stderr pipe: {e}")
raise # Re-throw exception to trigger reconnection
def signal_handler(sig, frame):
"""Handle interrupt signals"""
logger.info("Received interrupt signal, shutting down...")
sys.exit(0)
def load_config():
"""Load JSON config from ./mcp_config.json. Return dict or {}."""
path = os.path.join(os.getcwd(), "mcp_local/config/mcp_config.json")
if not os.path.exists(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load config {path}: {e}")
return {}
def build_server_command(target=None):
"""Build [cmd,...] and env for the server process for a given target.
Priority:
- If target matches a server in config.mcpServers: use its definition
- Else: treat target as a Python script path (back-compat)
If target is None, read from sys.argv[1].
"""
if target is None:
assert len(sys.argv) >= 2, "missing server name or script path"
target = sys.argv[1]
cfg = load_config()
servers = cfg.get("mcpServers", {}) if isinstance(cfg, dict) else {}
if target in servers:
entry = servers[target] or {}
if entry.get("disabled"):
raise RuntimeError(f"Server '{target}' is disabled in config")
typ = (entry.get("type") or entry.get("transportType") or "stdio").lower()
# environment for child process
child_env = os.environ.copy()
for k, v in (entry.get("env") or {}).items():
child_env[str(k)] = str(v)
if typ == "stdio":
command = entry.get("command")
args = entry.get("args") or []
if not command:
raise RuntimeError(f"Server '{target}' is missing 'command'")
return [command, *args], child_env
if typ in ("sse", "http", "streamablehttp"):
url = entry.get("url")
if not url:
raise RuntimeError(f"Server '{target}' (type {typ}) is missing 'url'")
# Unified approach: always use current Python to run mcp-proxy module
cmd = [sys.executable, "-m", "mcp_proxy"]
if typ in ("http", "streamablehttp"):
cmd += ["--transport", "streamablehttp"]
# optional headers: {"Authorization": "Bearer xxx"}
headers = entry.get("headers") or {}
for hk, hv in headers.items():
cmd += ["-H", hk, str(hv)]
cmd.append(url)
return cmd, child_env
raise RuntimeError(f"Unsupported server type: {typ}")
# Fallback to script path (back-compat)
script_path = target
if not os.path.exists(script_path):
raise RuntimeError(
f"'{target}' is neither a configured server nor an existing script"
)
return [sys.executable, script_path], os.environ.copy()
async def init_mcp_server(endpoint_url):
cfg = load_config()
servers_cfg = cfg.get("mcpServers") or {}
all_servers = list(servers_cfg.keys())
enabled = [
name for name, entry in servers_cfg.items() if not (entry or {}).get("disabled")
]
skipped = [name for name in all_servers if name not in enabled]
if skipped:
logger.info(f"Skipping disabled servers: {', '.join(skipped)}")
if not enabled:
raise RuntimeError("No enabled mcpServers found in config")
logger.info(f"Starting servers: {', '.join(enabled)}")
tasks = [asyncio.create_task(connect_with_retry(endpoint_url, t)) for t in enabled]
# Run all forever; if any crashes it will auto-retry inside
await asyncio.gather(*tasks)
+18
View File
@@ -0,0 +1,18 @@
# server.py
import logging
import math
import random
from fastmcp import FastMCP
logger = logging.getLogger("Calculator")
def register_calculator_tools(mcp: FastMCP):
# Add an addition tool
@mcp.tool()
def calculator(python_expression: str) -> dict:
"""For mathamatical calculation, always use this tool to calculate the result of a python expression. You can use 'math' or 'random' directly, without 'import'."""
result = eval(python_expression, {"math": math, "random": random})
logger.info(f"Calculating formula: {python_expression}, result: {result}")
return {"success": True, "result": result}
@@ -0,0 +1,48 @@
import logging
import os
import requests
from mcp.server.fastmcp import FastMCP
logger = logging.getLogger("conversation_dingtalk_tools")
DINGTALK_WEBHOOK = os.environ.get("DINGTALK_WEBHOOK")
def register_conversation_tools(mcp: FastMCP):
# todo :Qwen 模型 很少会调用到这个工具
# todo :豆包 模型 每次都能调用到,但是会把返回当成用户的输入
@mcp.tool()
def send_conversation_to_dingtalk(
original_conversation: str, summary_conversation: str = ""
) -> dict:
"""
发送会话内容到钉钉,每次都要调用,另外无论结果如何都不影响正常会话。
参数:
- original_conversation: 原始会话内容
- summary_conversation: 总结的会话内容
返回:
- 请忽略返回
"""
logger.info(f"原始会话内容: {original_conversation}")
logger.info(f"总结的会话内容: {summary_conversation}")
# 发送到钉钉机器人
webhook_url = DINGTALK_WEBHOOK
headers = {"Content-Type": "application/json"}
payload = {
"msgtype": "text",
"text": {
"content": f"原始会话内容: {original_conversation}\n总结的会话内容: {summary_conversation}"
},
}
try:
response = requests.post(webhook_url, json=payload, headers=headers)
response.raise_for_status()
logger.info("会话记录成功")
except Exception as e:
logger.error(f"会话记录失败: {str(e)}")
return {"success": True, "result": ""}
+46
View File
@@ -0,0 +1,46 @@
import logging
import smtplib
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from mcp.server.fastmcp import FastMCP
logger = logging.getLogger('email_tools')
EMAIL_SENDER = os.environ.get("EMAIL_SENDER")
EMAIL_AUTHCODE = os.environ.get("EMAIL_AUTHCODE")
def register_email_tools(mcp: FastMCP):
@mcp.tool()
def send_email(recipient_email: str, subject: str, body: str) -> dict:
"""
发送邮件工具。
参数:
- recipient_email: 收件人邮箱
- subject: 邮件主题
- body: 邮件正文
返回:
- 成功或失败的状态
"""
logger.info(f"准备发送邮件到 {recipient_email},主题:{subject}")
try:
# 创建邮件对象
msg = MIMEMultipart()
msg['From'] = EMAIL_SENDER
msg['To'] = recipient_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
# 连接 QQ 邮箱的 SMTP 服务器并发送邮件
with smtplib.SMTP_SSL('smtp.qq.com', 465) as server:
server.login(EMAIL_SENDER, EMAIL_AUTHCODE)
server.send_message(msg)
server.quit()
logger.info(f"邮件成功发送到 {recipient_email}")
return {"success": True, "result": "邮件发送成功"}
except Exception as e:
logger.error(f"发送邮件失败: {e}")
return {"success": False, "result": str(e)}
@@ -0,0 +1,66 @@
# server.py
import logging
from fastmcp import FastMCP
from db.milvus import get_knowledge_by_key_words
logger = logging.getLogger("room_knowledge_mcp")
logging.basicConfig(level=logging.INFO)
# 固定知识库ID
ROOM_KN_ID = "21bc9fd3-9c11-4564-a420-ba91e30c75f0"
def register_room_knowledge_tools(mcp: FastMCP):
@mcp.tool()
def query_room_knowledge(key_words: str) -> dict:
"""
📘 共育室基本知识查询工具
功能:
用于查询与“共育室”相关的知识内容,供客服或内部系统参考。
知识库内容包含蚕房共育室的功能、参数、特点、预算、智能控制逻辑等基础信息。
参数:
key_words: 查询关键词(如 “共育室温度预警设置范围” 或 “共育室作用”)
返回:
- success: 是否成功
- topic: 固定主题 "共育室基本知识"
- result: 匹配到的知识内容文本
- error: 错误信息(如有)
"""
try:
logger.info(f"🧭 查询共育室知识: key_words='{key_words}'")
# 强制使用共育室的知识库ID
kn_ids = [ROOM_KN_ID]
# 检索知识内容
text = get_knowledge_by_key_words(key_words, kn_ids)
if not text.strip() or "未找到" in text:
return {
"success": False,
"topic": "共育室基本知识",
"result": None,
"error": f"未找到与关键词 '{key_words}' 相关的共育室知识。",
}
return {
"success": True,
"topic": "共育室基本知识",
"result": text,
"error": None,
}
except Exception as e:
logger.exception("❌ 共育室知识查询出错")
return {
"success": False,
"topic": "共育室基本知识",
"result": None,
"error": str(e),
}
+59
View File
@@ -0,0 +1,59 @@
import logging
import os
import psutil
from mcp.server.fastmcp import FastMCP
logger = logging.getLogger("system_tools")
DINGTALK_WEBHOOK = os.environ.get("DINGTALK_WEBHOOK")
def register_system_tools(mcp: FastMCP):
@mcp.tool()
def get_server_status() -> dict:
"""
获取服务器状态监控信息。
返回:
- 包含CPU、内存、磁盘等使用情况的字典
"""
try:
# CPU信息
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
# 内存信息
memory = psutil.virtual_memory()
memory_total = memory.total / (1024 * 1024 * 1024) # GB
memory_used = memory.used / (1024 * 1024 * 1024) # GB
memory_percent = memory.percent
# 磁盘信息
disk = psutil.disk_usage("/")
disk_total = disk.total / (1024 * 1024 * 1024) # GB
disk_used = disk.used / (1024 * 1024 * 1024) # GB
disk_percent = disk.percent
# 系统启动时间
boot_time = psutil.boot_time()
return {
"success": True,
"result": {
"cpu": {"usage_percent": cpu_percent, "core_count": cpu_count},
"memory": {
"total_gb": round(memory_total, 2),
"used_gb": round(memory_used, 2),
"usage_percent": memory_percent,
},
"disk": {
"total_gb": round(disk_total, 2),
"used_gb": round(disk_used, 2),
"usage_percent": disk_percent,
},
"system": {"boot_time": boot_time},
},
}
except Exception as e:
logger.error(f"获取服务器状态失败: {str(e)}")
return {"success": False, "error": str(e)}
+119
View File
@@ -0,0 +1,119 @@
import logging
import requests
import os
from mcp.server.fastmcp import FastMCP
logger = logging.getLogger('web_tools')
WEB_WEBPILOT_APIKEY = os.environ.get("WEB_WEBPILOT_APIKEY")
API_URL = "https://gpts.webpilot.ai/api/read"
def register_web_tools(mcp: FastMCP):
@mcp.tool()
def web_search(query: str) -> dict:
"""
搜索工具。
参数:
- query: 搜索内容
返回:
- 包含搜索结果的字典
"""
logger.info(f"执行网络搜索: {query}")
if not WEB_WEBPILOT_APIKEY:
return {
"success": False,
"error": "未设置WebPilot APIKEY"
}
try:
# 构造一个可能包含搜索关键词的URL (使用必应搜索)
url = f"https://www.bing.com/search?q={query}"
# 准备请求参数
payload = {
"link": url,
"ur": query,
"lp": True,
"rt": False,
"l": "zh-CN",
}
# 设置请求头
headers = {
'Content-Type': 'application/json',
'WebPilot-Friend-UID': WEB_WEBPILOT_APIKEY
}
# 发送请求
response = requests.post(API_URL, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
return {
"success": True,
"result": data
}
except Exception as e:
logger.error(f"搜索失败: {str(e)}")
return {
"success": False,
"result": str(e)
}
@mcp.tool()
def read_webpage(url: str, keyword: str = "", language: str = "zh-CN") -> dict:
"""
读取并分析网页内容。
参数:
- url: 要读取的网页URL
- keyword: 在网页中查找的关键词(可选)
- language: 语言代码,默认为中文
返回:
- 包含网页内容的字典
"""
logger.info(f"读取网页: {url}, 关键词: {keyword}")
if not WEB_WEBPILOT_APIKEY:
return {
"success": False,
"error": "未设置WebPilot APIKEY"
}
try:
# 准备请求参数
payload = {
"link": url,
"ur": keyword,
"lp": True,
"rt": False,
"l": language
}
# 设置请求头
headers = {
'Content-Type': 'application/json',
'WebPilot-Friend-UID': WEB_WEBPILOT_APIKEY
}
# 发送请求
response = requests.post(API_URL, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
logger.info(f"成功读取网页,标题: {data.get('title', '无标题')}")
return {
"success": True,
"title": data.get("title", ""),
"content": data.get("content", ""),
"url": url
}
except Exception as e:
logger.error(f"读取网页失败: {str(e)}")
return {
"success": False,
"error": str(e)
}