Skip to content

Commit ef1fe1c

Browse files
committed
feat(agent): unify tool calling around MCP and add /api/mcp endpoint
1 parent a5d6063 commit ef1fe1c

13 files changed

Lines changed: 958 additions & 354 deletions

File tree

domain/agent/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from .service import AgentService
2-
from .types import AgentChatContext, AgentChatRequest, PendingToolCall
2+
from .types import AgentChatContext, AgentChatRequest, McpCall, PendingMcpCall
33

44
__all__ = [
55
"AgentService",
66
"AgentChatContext",
77
"AgentChatRequest",
8-
"PendingToolCall",
8+
"McpCall",
9+
"PendingMcpCall",
910
]

domain/agent/api.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
@router.post("/chat")
17-
@audit(action=AuditAction.CREATE, description="Agent 对话", body_fields=["auto_execute"])
17+
@audit(action=AuditAction.CREATE, description="Agent 对话", body_fields=["auto_execute", "approved_mcp_call_ids", "rejected_mcp_call_ids"])
1818
async def chat(
1919
request: Request,
2020
payload: AgentChatRequest,
@@ -25,7 +25,7 @@ async def chat(
2525

2626

2727
@router.post("/chat/stream")
28-
@audit(action=AuditAction.CREATE, description="Agent 对话(SSE)", body_fields=["auto_execute"])
28+
@audit(action=AuditAction.CREATE, description="Agent 对话(SSE)", body_fields=["auto_execute", "approved_mcp_call_ids", "rejected_mcp_call_ids"])
2929
async def chat_stream(
3030
request: Request,
3131
payload: AgentChatRequest,

domain/agent/mcp.py

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
import inspect
2+
import json
3+
from contextlib import asynccontextmanager
4+
from datetime import timedelta
5+
from typing import Annotated, Any, Literal
6+
from urllib.parse import quote, unquote
7+
8+
import httpx
9+
from mcp import ClientSession
10+
from mcp.client.streamable_http import streamablehttp_client
11+
from mcp.server.auth.provider import AccessToken
12+
from mcp.server.fastmcp import Context, FastMCP
13+
from mcp.server.fastmcp.server import AuthSettings
14+
from mcp.types import ToolAnnotations
15+
from pydantic import Field
16+
17+
from domain.auth import AuthService, User
18+
from domain.processors import ProcessorService
19+
20+
from .tools import get_tool, mcp_tool_descriptors
21+
from .tools.base import McpToolDescriptor, normalize_tool_result, tool_result_to_content
22+
23+
INTERNAL_MCP_BASE_URL = "http://127.0.0.1:8000/"
24+
CURRENT_PATH_HEADER = "x-foxel-current-path"
25+
26+
27+
def _normalize_path(path: str | None) -> str | None:
28+
if not path:
29+
return None
30+
value = str(path).strip().replace("\\", "/")
31+
if not value:
32+
return None
33+
if not value.startswith("/"):
34+
value = "/" + value
35+
return value.rstrip("/") or "/"
36+
37+
38+
def _header_current_path(ctx: Context | None) -> str | None:
39+
request = ctx.request_context.request if ctx and ctx.request_context else None
40+
if request is None:
41+
return None
42+
return _normalize_path(request.headers.get(CURRENT_PATH_HEADER))
43+
44+
45+
def _field_annotation(schema: dict[str, Any], required: bool) -> tuple[Any, Any]:
46+
raw_type = schema.get("type")
47+
enum_values = schema.get("enum")
48+
description = str(schema.get("description") or "").strip() or None
49+
default = schema.get("default", inspect.Parameter.empty if required else None)
50+
51+
annotation: Any
52+
if isinstance(enum_values, list) and enum_values:
53+
annotation = Literal.__getitem__(tuple(enum_values))
54+
elif raw_type == "string":
55+
annotation = str
56+
elif raw_type == "integer":
57+
annotation = int
58+
elif raw_type == "number":
59+
annotation = float
60+
elif raw_type == "boolean":
61+
annotation = bool
62+
elif raw_type == "array":
63+
annotation = list[Any]
64+
elif raw_type == "object":
65+
annotation = dict[str, Any]
66+
else:
67+
annotation = Any
68+
69+
if not required and default is None:
70+
annotation = annotation | None
71+
72+
if description:
73+
annotation = Annotated[annotation, Field(description=description)]
74+
return annotation, default
75+
76+
77+
def _build_tool_signature(descriptor: McpToolDescriptor) -> inspect.Signature:
78+
schema = descriptor.input_schema if isinstance(descriptor.input_schema, dict) else {}
79+
properties = schema.get("properties") if isinstance(schema.get("properties"), dict) else {}
80+
required = set(schema.get("required") or [])
81+
parameters: list[inspect.Parameter] = []
82+
for key, value in properties.items():
83+
prop_schema = value if isinstance(value, dict) else {}
84+
annotation, default = _field_annotation(prop_schema, key in required)
85+
parameters.append(
86+
inspect.Parameter(
87+
str(key),
88+
inspect.Parameter.POSITIONAL_OR_KEYWORD,
89+
default=default,
90+
annotation=annotation,
91+
)
92+
)
93+
return inspect.Signature(parameters=parameters, return_annotation=dict[str, Any])
94+
95+
96+
def _build_tool_wrapper(descriptor: McpToolDescriptor):
97+
async def wrapper(**kwargs: Any) -> dict[str, Any]:
98+
spec = get_tool(descriptor.name)
99+
if not spec:
100+
return normalize_tool_result({"error": f"unknown_tool: {descriptor.name}"})
101+
try:
102+
result = await spec.handler(kwargs)
103+
return normalize_tool_result(result)
104+
except Exception as exc: # noqa: BLE001
105+
return normalize_tool_result({"error": str(exc)})
106+
107+
wrapper.__name__ = descriptor.name
108+
wrapper.__doc__ = descriptor.description
109+
wrapper.__signature__ = _build_tool_signature(descriptor)
110+
return wrapper
111+
112+
113+
class FoxelMcpTokenVerifier:
114+
async def verify_token(self, token: str) -> AccessToken | None:
115+
try:
116+
user = await AuthService.get_current_active_user(await AuthService.get_current_user(token))
117+
except Exception: # noqa: BLE001
118+
return None
119+
return AccessToken(token=token, client_id=user.username, scopes=[])
120+
121+
122+
MCP_SERVER = FastMCP(
123+
name="Foxel MCP",
124+
instructions="Foxel 内置 MCP 服务,提供文件系统、网页抓取、时间与处理器相关能力。",
125+
streamable_http_path="/",
126+
token_verifier=FoxelMcpTokenVerifier(),
127+
auth=AuthSettings(
128+
issuer_url="http://127.0.0.1:8000",
129+
resource_server_url=None,
130+
required_scopes=[],
131+
),
132+
)
133+
134+
135+
for descriptor in mcp_tool_descriptors():
136+
MCP_SERVER.add_tool(
137+
_build_tool_wrapper(descriptor),
138+
name=descriptor.name,
139+
description=descriptor.description,
140+
annotations=ToolAnnotations.model_validate(descriptor.annotations),
141+
meta=descriptor.meta,
142+
structured_output=False,
143+
)
144+
145+
146+
@MCP_SERVER.resource(
147+
"foxel://context/current-path",
148+
name="current_path",
149+
title="Current Path",
150+
description="返回当前请求上下文里的文件管理目录。",
151+
mime_type="application/json",
152+
)
153+
def current_path_resource() -> dict[str, Any]:
154+
return {"current_path": None}
155+
156+
157+
@MCP_SERVER.resource(
158+
"foxel://policy/tool-confirmation",
159+
name="tool_confirmation_policy",
160+
title="Tool Confirmation Policy",
161+
description="返回 Foxel agent 对工具审批的策略。",
162+
mime_type="application/json",
163+
)
164+
def tool_confirmation_policy_resource() -> dict[str, Any]:
165+
return {
166+
"read_tools": [tool.name for tool in mcp_tool_descriptors() if not tool.requires_confirmation],
167+
"write_tools": [tool.name for tool in mcp_tool_descriptors() if tool.requires_confirmation],
168+
"rule": "直接调用 MCP tool 时不额外审批;通过 agent 代表用户执行写操作时需要审批。",
169+
}
170+
171+
172+
@MCP_SERVER.resource(
173+
"foxel://processors/index",
174+
name="processors_index",
175+
title="Processors Index",
176+
description="返回当前可用处理器列表。",
177+
mime_type="application/json",
178+
)
179+
def processors_index_resource() -> dict[str, Any]:
180+
return {"processors": ProcessorService.list_processors()}
181+
182+
183+
async def _tool_resource(tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
184+
spec = get_tool(tool_name)
185+
if not spec:
186+
return normalize_tool_result({"error": f"unknown_tool: {tool_name}"})
187+
try:
188+
result = await spec.handler(arguments)
189+
return normalize_tool_result(result)
190+
except Exception as exc: # noqa: BLE001
191+
return normalize_tool_result({"error": str(exc)})
192+
193+
194+
@MCP_SERVER.resource(
195+
"foxel://vfs/stat/{path}",
196+
name="vfs_stat_resource",
197+
title="VFS Stat",
198+
description="读取指定路径的文件或目录元信息;path 需要 URL 编码。",
199+
mime_type="application/json",
200+
)
201+
async def vfs_stat_resource(path: str) -> dict[str, Any]:
202+
return await _tool_resource("vfs_stat", {"path": "/" + unquote(path).lstrip("/")})
203+
204+
205+
@MCP_SERVER.resource(
206+
"foxel://vfs/text/{path}",
207+
name="vfs_text_resource",
208+
title="VFS Text",
209+
description="读取文本文件内容;path 需要 URL 编码。",
210+
mime_type="application/json",
211+
)
212+
async def vfs_text_resource(path: str) -> dict[str, Any]:
213+
return await _tool_resource("vfs_read_text", {"path": "/" + unquote(path).lstrip("/")})
214+
215+
216+
@MCP_SERVER.resource(
217+
"foxel://vfs/dir/{path}",
218+
name="vfs_dir_resource",
219+
title="VFS Directory",
220+
description="列出目录内容;path 需要 URL 编码。",
221+
mime_type="application/json",
222+
)
223+
async def vfs_dir_resource(path: str) -> dict[str, Any]:
224+
return await _tool_resource("vfs_list_dir", {"path": "/" + unquote(path).lstrip("/")})
225+
226+
227+
@MCP_SERVER.resource(
228+
"foxel://vfs/search/{query}",
229+
name="vfs_search_resource",
230+
title="VFS Search",
231+
description="搜索文件;query 需要 URL 编码。",
232+
mime_type="application/json",
233+
)
234+
async def vfs_search_resource(query: str) -> dict[str, Any]:
235+
return await _tool_resource("vfs_search", {"q": unquote(query)})
236+
237+
238+
@MCP_SERVER.prompt(name="browse_path", title="Browse Path", description="生成浏览目录的推荐提示词。")
239+
def browse_path_prompt(path: Annotated[str, Field(description="目标目录路径")]) -> list[dict[str, Any]]:
240+
return [{"role": "user", "content": f"请先浏览目录 `{path}`,总结结构与关键文件。必要时调用 vfs_list_dir 与 vfs_stat。"}]
241+
242+
243+
@MCP_SERVER.prompt(name="inspect_file", title="Inspect File", description="生成查看文件的推荐提示词。")
244+
def inspect_file_prompt(path: Annotated[str, Field(description="目标文件路径")]) -> list[dict[str, Any]]:
245+
return [{"role": "user", "content": f"请检查文件 `{path}` 的内容与用途。必要时调用 vfs_read_text。"}]
246+
247+
248+
@MCP_SERVER.prompt(name="search_files", title="Search Files", description="生成搜索文件的推荐提示词。")
249+
def search_files_prompt(query: Annotated[str, Field(description="搜索关键词")]) -> list[dict[str, Any]]:
250+
return [{"role": "user", "content": f"请搜索与 `{query}` 相关的文件,并按相关性总结。必要时调用 vfs_search。"}]
251+
252+
253+
@MCP_SERVER.prompt(name="edit_file_safely", title="Edit File Safely", description="生成安全修改文件的推荐提示词。")
254+
def edit_file_safely_prompt(path: Annotated[str, Field(description="目标文件路径")]) -> list[dict[str, Any]]:
255+
return [{"role": "user", "content": f"请先读取 `{path}`,解释拟修改点,再等待我确认后执行写入。"}]
256+
257+
258+
@MCP_SERVER.prompt(name="run_processor", title="Run Processor", description="生成运行处理器的推荐提示词。")
259+
def run_processor_prompt(
260+
path: Annotated[str, Field(description="目标文件或目录路径")],
261+
processor_type: Annotated[str, Field(description="处理器类型")],
262+
) -> list[dict[str, Any]]:
263+
return [{"role": "user", "content": f"请检查 `{path}` 是否适合运行处理器 `{processor_type}`,确认参数后再执行 processors_run。"}]
264+
265+
266+
@MCP_SERVER.prompt(name="fetch_web_page", title="Fetch Web Page", description="生成抓取网页的推荐提示词。")
267+
def fetch_web_page_prompt(url: Annotated[str, Field(description="目标网址")]) -> list[dict[str, Any]]:
268+
return [{"role": "user", "content": f"请抓取网页 `{url}`,并总结标题、正文与关键链接。必要时调用 web_fetch。"}]
269+
270+
271+
MCP_HTTP_APP = MCP_SERVER.streamable_http_app()
272+
273+
274+
def loopback_httpx_client_factory(app):
275+
def factory(headers: dict[str, str] | None = None, timeout=None, auth=None) -> httpx.AsyncClient:
276+
return httpx.AsyncClient(
277+
transport=httpx.ASGITransport(app=app),
278+
base_url=INTERNAL_MCP_BASE_URL.rstrip("/"),
279+
headers=headers,
280+
timeout=timeout,
281+
auth=auth,
282+
follow_redirects=True,
283+
)
284+
285+
return factory
286+
287+
288+
async def create_loopback_mcp_headers(user: User | None, current_path: str | None = None) -> dict[str, str]:
289+
headers: dict[str, str] = {}
290+
if user is not None:
291+
token = await AuthService.create_access_token(
292+
{"sub": user.username},
293+
expires_delta=timedelta(minutes=5),
294+
)
295+
headers["Authorization"] = f"Bearer {token}"
296+
if current_path:
297+
headers[CURRENT_PATH_HEADER] = current_path
298+
return headers
299+
300+
301+
@asynccontextmanager
302+
async def mcp_client_session(user: User | None, current_path: str | None = None):
303+
headers = await create_loopback_mcp_headers(user, current_path)
304+
async with streamablehttp_client(
305+
INTERNAL_MCP_BASE_URL,
306+
headers=headers,
307+
httpx_client_factory=loopback_httpx_client_factory(MCP_HTTP_APP),
308+
) as (read_stream, write_stream, _):
309+
async with ClientSession(read_stream, write_stream) as session:
310+
await session.initialize()
311+
yield session
312+
313+
314+
def mcp_content_to_text(content: list[Any], structured_content: dict[str, Any] | None = None) -> str:
315+
if structured_content is not None:
316+
try:
317+
return json.dumps(structured_content, ensure_ascii=False)
318+
except TypeError:
319+
pass
320+
321+
text_parts: list[str] = []
322+
for item in content:
323+
item_type = getattr(item, "type", None)
324+
if item_type == "text":
325+
text = getattr(item, "text", None)
326+
if isinstance(text, str) and text:
327+
text_parts.append(text)
328+
if text_parts:
329+
return "\n".join(text_parts)
330+
return tool_result_to_content({"error": "empty_mcp_content"})
331+
332+
333+
def encode_resource_path(path: str) -> str:
334+
return quote(path.lstrip("/"), safe="")

0 commit comments

Comments
 (0)