From 59eceeddda8b28de947fc6ab54d8ec5bb1e7607c Mon Sep 17 00:00:00 2001 From: vipin-v-nair Date: Mon, 27 Apr 2026 20:47:17 +0000 Subject: [PATCH] fix(mcp): add use_isolated_event_loop to McpToolset for Agent Engine compatibility On Vertex AI Agent Engine, McpToolset with StreamableHTTPConnectionParams fails with: Attempted to exit cancel scope in a different task than it was entered in Root cause: anyio's CancelScope binds to the asyncio.Task that enters it. Agent Engine's scheduler can context-switch tasks between entering and exiting the scope inside streamablehttp_client's anyio.create_task_group(), causing the assertion to fire. Fix: add use_isolated_event_loop=True to McpToolset (and the underlying McpTool). When set, each MCP operation (tool discovery and tool calls) runs via asyncio.to_thread() in a dedicated thread with asyncio.new_event_loop(). The anyio cancel scope is created and destroyed entirely within that isolated loop, so it never crosses task boundaries in the caller's scheduler. The new mcp_thread_utils module contains the thread-safe helpers (list_tools_in_thread, call_tool_in_thread). auth_scheme, auth_credential, and header_provider are fully supported in this mode. progress_callback and MCP sampling are not invoked (documented limitation). The flag is opt-in and defaults to False, preserving all existing behaviour. It is restricted to StreamableHTTPConnectionParams; other transports raise ValueError. Verified against Vertex AI Agent Engine with three Cloud Run MCP servers. Co-Authored-By: vipin-v-nair --- .../adk/tools/mcp_tool/mcp_thread_utils.py | 142 ++++++++++++++++++ src/google/adk/tools/mcp_tool/mcp_tool.py | 42 +++++- src/google/adk/tools/mcp_tool/mcp_toolset.py | 55 +++++++ 3 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 src/google/adk/tools/mcp_tool/mcp_thread_utils.py diff --git a/src/google/adk/tools/mcp_tool/mcp_thread_utils.py b/src/google/adk/tools/mcp_tool/mcp_thread_utils.py new file mode 100644 index 0000000000..42e35b8f61 --- /dev/null +++ b/src/google/adk/tools/mcp_tool/mcp_thread_utils.py @@ -0,0 +1,142 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Thread-isolated MCP helpers for environments with anyio cancel scope constraints. + +Root cause +---------- +``anyio``'s ``CancelScope`` binds to the ``asyncio.Task`` that *enters* it. +On Vertex AI Agent Engine, the scheduler can context-switch tasks between +entering and exiting the scope inside ``streamablehttp_client``'s +``anyio.create_task_group()``, which raises:: + + Attempted to exit cancel scope in a different task than it was entered in + +Fix +--- +Run each MCP operation inside a dedicated thread via ``asyncio.to_thread()``. +Inside that thread, ``asyncio.new_event_loop()`` creates an isolated event +loop. The ``anyio`` cancel scope is created *and* destroyed entirely within +that loop, so it never crosses task boundaries in the caller's scheduler. + +Trade-offs +---------- +* A new HTTP connection is opened per tool call (no session reuse). +* ``progress_callback`` and MCP sampling are not supported in this path. +* Auth headers are threaded through, so ``auth_scheme``/``auth_credential`` + and ``header_provider`` on ``McpToolset`` remain functional. +""" + +import asyncio +import json +from typing import Any +from typing import Optional + +from mcp.client.session import ClientSession +from mcp.client.streamable_http import streamablehttp_client + + +def _cancel_pending(loop: asyncio.AbstractEventLoop) -> None: + """Cancel all pending tasks so the loop can be closed without warnings.""" + pending = asyncio.all_tasks(loop) + if not pending: + return + for task in pending: + task.cancel() + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + + +def list_tools_in_thread( + url: str, + headers: Optional[dict] = None, +) -> list: + """Return tools/list results from an MCP server via an isolated event loop. + + Opens a fresh connection for every call; must be invoked from a non-async + context (i.e. via ``asyncio.to_thread``). + + Args: + url: The MCP server URL. + headers: Optional HTTP headers to include in the request. + + Returns: + A list of ``mcp.types.Tool`` objects. + """ + + async def _async(): + kwargs = {"headers": headers} if headers else {} + async with streamablehttp_client(url, **kwargs) as (read, write, _): + async with ClientSession(read, write) as session: + await session.initialize() + result = await session.list_tools() + return result.tools + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(_async()) + finally: + _cancel_pending(loop) + loop.close() + + +def call_tool_in_thread( + url: str, + tool_name: str, + arguments: dict, + headers: Optional[dict] = None, +) -> Any: + """Call an MCP tool via an isolated event loop and return the parsed result. + + Opens a fresh connection for every call; must be invoked from a non-async + context (i.e. via ``asyncio.to_thread``). + + Args: + url: The MCP server URL. + tool_name: The name of the tool to call. + arguments: The arguments to pass to the tool. + headers: Optional HTTP headers to include in the request. + + Returns: + The parsed tool result (JSON-decoded dict, or ``{"text": ...}`` fallback). + + Raises: + RuntimeError: If the MCP server returns an error response. + """ + + async def _async(): + kwargs = {"headers": headers} if headers else {} + async with streamablehttp_client(url, **kwargs) as (read, write, _): + async with ClientSession(read, write) as session: + await session.initialize() + result = await session.call_tool(tool_name, arguments) + if result.isError: + raise RuntimeError( + f"MCP tool '{tool_name}' returned an error: {result.content}" + ) + if result.content: + text = result.content[0].text + try: + return json.loads(text) + except (json.JSONDecodeError, AttributeError): + return {"text": text} + return {} + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(_async()) + finally: + _cancel_pending(loop) + loop.close() diff --git a/src/google/adk/tools/mcp_tool/mcp_tool.py b/src/google/adk/tools/mcp_tool/mcp_tool.py index 6a24651f92..766fb763b7 100644 --- a/src/google/adk/tools/mcp_tool/mcp_tool.py +++ b/src/google/adk/tools/mcp_tool/mcp_tool.py @@ -57,6 +57,7 @@ from ..tool_context import ToolContext from .mcp_session_manager import MCPSessionManager from .mcp_session_manager import retry_on_errors +from .mcp_session_manager import StreamableHTTPConnectionParams from .session_context import SessionContext logger = logging.getLogger("google_adk." + __name__) @@ -148,6 +149,7 @@ def __init__( progress_callback: Optional[ Union[ProgressFnT, ProgressCallbackFactory] ] = None, + use_isolated_event_loop: bool = False, ): """Initializes an McpTool. @@ -175,8 +177,18 @@ def __init__( returns a ProgressFnT or None. This allows callbacks to access and modify runtime context like session state. + use_isolated_event_loop: When ``True``, each tool call runs in a + dedicated thread with an isolated ``asyncio`` event loop. This avoids + the anyio ``CancelScope`` cross-task error that occurs on Vertex AI + Agent Engine when using ``StreamableHTTPConnectionParams``. Only + supported with ``StreamableHTTPConnectionParams``; raises + ``ValueError`` for other transport types. Note: ``progress_callback`` + is not invoked in this mode. + Raises: - ValueError: If mcp_tool or mcp_session_manager is None. + ValueError: If mcp_tool or mcp_session_manager is None, or if + ``use_isolated_event_loop=True`` is combined with a non-streamable- + HTTP transport. """ # --- BEGIN BOUND TOKEN PATCH --- @@ -197,11 +209,25 @@ def __init__( if auth_scheme else None, ) + if use_isolated_event_loop and not isinstance( + mcp_session_manager.connection_params, StreamableHTTPConnectionParams + ): + raise ValueError( + "use_isolated_event_loop=True is only supported with" + " StreamableHTTPConnectionParams." + ) + self._mcp_tool = mcp_tool self._mcp_session_manager = mcp_session_manager self._require_confirmation = require_confirmation self._header_provider = header_provider self._progress_callback = progress_callback + self._use_isolated_event_loop = use_isolated_event_loop + self._server_url: Optional[str] = ( + mcp_session_manager.connection_params.url + if use_isolated_event_loop + else None + ) @override def _get_declaration(self) -> FunctionDeclaration: @@ -397,6 +423,20 @@ async def _run_async_impl( headers.update(dynamic_headers) final_headers = headers if headers else None + if self._use_isolated_event_loop: + # Run in a dedicated thread with an isolated event loop to avoid the + # anyio CancelScope cross-task error on Vertex AI Agent Engine. + # See mcp_thread_utils.py for the full explanation. + from .mcp_thread_utils import call_tool_in_thread # pylint: disable=g-import-not-at-top + + return await asyncio.to_thread( + call_tool_in_thread, + self._server_url, + self.name, + args, + final_headers, + ) + # Propagate trace context in the _meta field as sprcified by MCP protocol. # See https://agentclientprotocol.com/protocol/extensibility#the-meta-field trace_carrier: Dict[str, str] = {} diff --git a/src/google/adk/tools/mcp_tool/mcp_toolset.py b/src/google/adk/tools/mcp_tool/mcp_toolset.py index c566c52746..a5791cf3cb 100644 --- a/src/google/adk/tools/mcp_tool/mcp_toolset.py +++ b/src/google/adk/tools/mcp_tool/mcp_toolset.py @@ -118,6 +118,7 @@ def __init__( use_mcp_resources: Optional[bool] = False, sampling_callback: Optional[SamplingFnT] = None, sampling_capabilities: Optional[SamplingCapability] = None, + use_isolated_event_loop: bool = False, ): """Initializes the McpToolset. @@ -157,6 +158,14 @@ def __init__( sampling_callback: Optional callback to handle sampling requests from the MCP server. sampling_capabilities: Optional capabilities for sampling. + use_isolated_event_loop: When ``True``, each MCP operation (tool + discovery and tool calls) runs in a dedicated thread with an isolated + ``asyncio`` event loop. This avoids the anyio ``CancelScope`` cross-task + error that occurs on Vertex AI Agent Engine when using + ``StreamableHTTPConnectionParams``. Only supported with + ``StreamableHTTPConnectionParams``; raises ``ValueError`` for other + transport types. Note: ``progress_callback`` and MCP sampling are not + invoked in this mode. """ # --- BEGIN BOUND TOKEN PATCH --- @@ -176,6 +185,14 @@ def __init__( if not connection_params: raise ValueError("Missing connection params in McpToolset.") + if use_isolated_event_loop and not isinstance( + connection_params, StreamableHTTPConnectionParams + ): + raise ValueError( + "use_isolated_event_loop=True is only supported with" + " StreamableHTTPConnectionParams." + ) + self._connection_params = connection_params self._errlog = errlog self._header_provider = header_provider @@ -202,6 +219,7 @@ def __init__( else None ) self._use_mcp_resources = use_mcp_resources + self._use_isolated_event_loop = use_isolated_event_loop def _get_auth_headers( self, readonly_context: Optional[ReadonlyContext] = None @@ -340,6 +358,43 @@ async def get_tools( Returns: List[BaseTool]: A list of tools available under the specified context. """ + if self._use_isolated_event_loop: + # Discover tools via an isolated thread to avoid the anyio CancelScope + # cross-task error on Vertex AI Agent Engine. + # See mcp_thread_utils.py for the full explanation. + from .mcp_thread_utils import list_tools_in_thread # pylint: disable=g-import-not-at-top + + headers: Dict[str, str] = {} + auth_headers = self._get_auth_headers(readonly_context) + if auth_headers: + headers.update(auth_headers) + if self._header_provider and readonly_context: + provider_headers = self._header_provider(readonly_context) + if provider_headers: + headers.update(provider_headers) + + raw_tools = await asyncio.to_thread( + list_tools_in_thread, + self._connection_params.url, + headers or None, + ) + tools = [] + for tool in raw_tools: + mcp_tool = MCPTool( + mcp_tool=tool, + mcp_session_manager=self._mcp_session_manager, + auth_scheme=self._auth_scheme, + auth_credential=self._auth_credential, + require_confirmation=self._require_confirmation, + header_provider=self._header_provider, + use_isolated_event_loop=True, + ) + if self._is_tool_selected(mcp_tool, readonly_context): + tools.append(mcp_tool) + if self._use_mcp_resources: + tools.append(LoadMcpResourceTool(mcp_toolset=self)) + return tools + # Fetch available tools from the MCP server tools_response: ListToolsResult = await self._execute_with_session( lambda session: session.list_tools(),