Files
AILab/bbit_ai/app_mcp/utils/jsonrpc.py
T
2025-11-05 18:06:40 +08:00

225 lines
6.7 KiB
Python

"""
JSON-RPC 2.0 协议封装类
用于统一处理JSON-RPC消息的格式化和解析
"""
import json
from typing import Any, Dict, Optional, Union
from dataclasses import dataclass, asdict
@dataclass
class JSONRPCError:
"""JSON-RPC错误对象"""
code: int
message: str
data: Optional[Any] = None
@dataclass
class JSONRPCRequest:
"""JSON-RPC请求对象"""
method: str
params: Optional[Union[Dict, list]] = None
id: Optional[Union[str, int]] = None
jsonrpc: str = "2.0"
@dataclass
class JSONRPCResponse:
"""JSON-RPC响应对象"""
result: Optional[Any] = None
error: Optional[JSONRPCError] = None
id: Optional[Union[str, int]] = None
jsonrpc: str = "2.0"
class JSONRPCProtocol:
"""JSON-RPC 2.0 协议封装类"""
# 预定义错误码
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
# 自定义错误码
TOOL_NOT_CONNECTED = -32001
FORWARD_FAILED = -32002
CONNECTION_ERROR = -32003
AUTHENTICATION_ERROR = -32004
@staticmethod
def create_request(
method: str,
params: Optional[Union[Dict, list]] = None,
request_id: Optional[Union[str, int]] = None,
) -> JSONRPCRequest:
"""创建JSON-RPC请求"""
return JSONRPCRequest(
method=method, params=params, id=request_id, jsonrpc="2.0"
)
@staticmethod
def create_success_response(
result: Any, request_id: Optional[Union[str, int]] = None
) -> JSONRPCResponse:
"""创建成功响应"""
return JSONRPCResponse(result=result, id=request_id, jsonrpc="2.0")
@staticmethod
def create_error_response(
error_code: int,
error_message: str,
error_data: Optional[Any] = None,
request_id: Optional[Union[str, int]] = None,
) -> JSONRPCResponse:
"""创建错误响应"""
error = JSONRPCError(code=error_code, message=error_message, data=error_data)
return JSONRPCResponse(error=error, id=request_id, jsonrpc="2.0")
@staticmethod
def create_notification(
method: str, params: Optional[Union[Dict, list]] = None
) -> JSONRPCRequest:
"""创建通知消息(无ID的请求)"""
return JSONRPCRequest(method=method, params=params, id=None, jsonrpc="2.0")
@staticmethod
def to_dict(obj: Union[JSONRPCRequest, JSONRPCResponse]) -> Dict:
"""将对象转换为字典"""
return asdict(obj)
@staticmethod
def to_json(
obj: Union[JSONRPCRequest, JSONRPCResponse], ensure_ascii: bool = False
) -> str:
"""将对象转换为JSON字符串"""
return json.dumps(asdict(obj), ensure_ascii=ensure_ascii)
@staticmethod
def parse_request(json_str: str) -> Optional[JSONRPCRequest]:
"""解析JSON-RPC请求"""
try:
data = json.loads(json_str)
if not isinstance(data, dict):
return None
# 验证必需字段
if "jsonrpc" not in data or data["jsonrpc"] != "2.0":
return None
if "method" not in data:
return None
return JSONRPCRequest(
method=data["method"],
params=data.get("params"),
id=data.get("id"),
jsonrpc=data["jsonrpc"],
)
except (json.JSONDecodeError, KeyError, TypeError):
return None
@staticmethod
def parse_response(json_str: str) -> Optional[JSONRPCResponse]:
"""解析JSON-RPC响应"""
try:
data = json.loads(json_str)
if not isinstance(data, dict):
return None
# 验证必需字段
if "jsonrpc" not in data or data["jsonrpc"] != "2.0":
return None
# 检查是否有result或error字段
has_result = "result" in data
has_error = "error" in data
if not has_result and not has_error:
return None
if has_result and has_error:
return None
response = JSONRPCResponse(id=data.get("id"), jsonrpc=data["jsonrpc"])
if has_result:
response.result = data["result"]
else:
error_data = data["error"]
response.error = JSONRPCError(
code=error_data["code"],
message=error_data["message"],
data=error_data.get("data"),
)
return response
except (json.JSONDecodeError, KeyError, TypeError):
return None
@staticmethod
def is_valid_request(json_str: str) -> bool:
"""验证是否为有效的JSON-RPC请求"""
return JSONRPCProtocol.parse_request(json_str) is not None
@staticmethod
def is_valid_response(json_str: str) -> bool:
"""验证是否为有效的JSON-RPC响应"""
return JSONRPCProtocol.parse_response(json_str) is not None
@staticmethod
def is_notification(json_str: str) -> bool:
"""检查是否为通知消息(无ID的请求)"""
request = JSONRPCProtocol.parse_request(json_str)
return request is not None and request.id is None
def create_tool_not_connected_error(
request_id: Optional[Union[str, int]] = None, agent_id: Optional[str] = None
) -> str:
"""创建工具端未连接的错误消息"""
error_data = (
{"agent_id": agent_id, "details": "请求的工具端连接不存在或已断开"}
if agent_id
else None
)
response = JSONRPCProtocol.create_error_response(
error_code=JSONRPCProtocol.TOOL_NOT_CONNECTED,
error_message="工具端未连接",
error_data=error_data,
request_id=request_id,
)
return JSONRPCProtocol.to_json(response, ensure_ascii=False)
def create_forward_failed_error(
request_id: Optional[Union[str, int]] = None, agent_id: Optional[str] = None
) -> str:
"""创建转发失败的错误消息"""
error_data = (
{"agent_id": agent_id, "details": "消息转发过程中发生错误"}
if agent_id
else None
)
response = JSONRPCProtocol.create_error_response(
error_code=JSONRPCProtocol.FORWARD_FAILED,
error_message="转发消息失败",
error_data=error_data,
request_id=request_id,
)
return JSONRPCProtocol.to_json(response, ensure_ascii=False)
def create_authentication_error(message: str = "认证失败") -> str:
"""创建认证错误消息"""
response = JSONRPCProtocol.create_error_response(
error_code=JSONRPCProtocol.AUTHENTICATION_ERROR, error_message=message
)
return JSONRPCProtocol.to_json(response, ensure_ascii=False)