Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions src/google/adk/tools/mcp_tool/mcp_thread_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Thread-isolated MCP helpers for environments with anyio cancel scope constraints.

Root cause
----------
``anyio``'s ``CancelScope`` binds to the ``asyncio.Task`` that *enters* it.
On Vertex AI Agent Engine, the scheduler can context-switch tasks between
entering and exiting the scope inside ``streamablehttp_client``'s
``anyio.create_task_group()``, which raises::

Attempted to exit cancel scope in a different task than it was entered in

Fix
---
Run each MCP operation inside a dedicated thread via ``asyncio.to_thread()``.
Inside that thread, ``asyncio.new_event_loop()`` creates an isolated event
loop. The ``anyio`` cancel scope is created *and* destroyed entirely within
that loop, so it never crosses task boundaries in the caller's scheduler.

Trade-offs
----------
* A new HTTP connection is opened per tool call (no session reuse).
* ``progress_callback`` and MCP sampling are not supported in this path.
* Auth headers are threaded through, so ``auth_scheme``/``auth_credential``
and ``header_provider`` on ``McpToolset`` remain functional.
"""

import asyncio
import json
from typing import Any
from typing import Optional

from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client


def _cancel_pending(loop: asyncio.AbstractEventLoop) -> None:
"""Cancel all pending tasks so the loop can be closed without warnings."""
pending = asyncio.all_tasks(loop)
if not pending:
return
for task in pending:
task.cancel()
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))


def list_tools_in_thread(
url: str,
headers: Optional[dict] = None,
) -> list:
"""Return tools/list results from an MCP server via an isolated event loop.

Opens a fresh connection for every call; must be invoked from a non-async
context (i.e. via ``asyncio.to_thread``).

Args:
url: The MCP server URL.
headers: Optional HTTP headers to include in the request.

Returns:
A list of ``mcp.types.Tool`` objects.
"""

async def _async():
kwargs = {"headers": headers} if headers else {}
async with streamablehttp_client(url, **kwargs) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.list_tools()
return result.tools

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(_async())
finally:
_cancel_pending(loop)
loop.close()


def call_tool_in_thread(
url: str,
tool_name: str,
arguments: dict,
headers: Optional[dict] = None,
) -> Any:
"""Call an MCP tool via an isolated event loop and return the parsed result.

Opens a fresh connection for every call; must be invoked from a non-async
context (i.e. via ``asyncio.to_thread``).

Args:
url: The MCP server URL.
tool_name: The name of the tool to call.
arguments: The arguments to pass to the tool.
headers: Optional HTTP headers to include in the request.

Returns:
The parsed tool result (JSON-decoded dict, or ``{"text": ...}`` fallback).

Raises:
RuntimeError: If the MCP server returns an error response.
"""

async def _async():
kwargs = {"headers": headers} if headers else {}
async with streamablehttp_client(url, **kwargs) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(tool_name, arguments)
if result.isError:
raise RuntimeError(
f"MCP tool '{tool_name}' returned an error: {result.content}"
)
if result.content:
text = result.content[0].text
try:
return json.loads(text)
except (json.JSONDecodeError, AttributeError):
return {"text": text}
return {}

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(_async())
finally:
_cancel_pending(loop)
loop.close()
42 changes: 41 additions & 1 deletion src/google/adk/tools/mcp_tool/mcp_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from ..tool_context import ToolContext
from .mcp_session_manager import MCPSessionManager
from .mcp_session_manager import retry_on_errors
from .mcp_session_manager import StreamableHTTPConnectionParams
from .session_context import SessionContext

logger = logging.getLogger("google_adk." + __name__)
Expand Down Expand Up @@ -148,6 +149,7 @@ def __init__(
progress_callback: Optional[
Union[ProgressFnT, ProgressCallbackFactory]
] = None,
use_isolated_event_loop: bool = False,
):
"""Initializes an McpTool.

Expand Down Expand Up @@ -175,8 +177,18 @@ def __init__(
returns a ProgressFnT or None. This allows callbacks to access
and modify runtime context like session state.

use_isolated_event_loop: When ``True``, each tool call runs in a
dedicated thread with an isolated ``asyncio`` event loop. This avoids
the anyio ``CancelScope`` cross-task error that occurs on Vertex AI
Agent Engine when using ``StreamableHTTPConnectionParams``. Only
supported with ``StreamableHTTPConnectionParams``; raises
``ValueError`` for other transport types. Note: ``progress_callback``
is not invoked in this mode.

Raises:
ValueError: If mcp_tool or mcp_session_manager is None.
ValueError: If mcp_tool or mcp_session_manager is None, or if
``use_isolated_event_loop=True`` is combined with a non-streamable-
HTTP transport.
"""

# --- BEGIN BOUND TOKEN PATCH ---
Expand All @@ -197,11 +209,25 @@ def __init__(
if auth_scheme
else None,
)
if use_isolated_event_loop and not isinstance(
mcp_session_manager.connection_params, StreamableHTTPConnectionParams
):
raise ValueError(
"use_isolated_event_loop=True is only supported with"
" StreamableHTTPConnectionParams."
)

self._mcp_tool = mcp_tool
self._mcp_session_manager = mcp_session_manager
self._require_confirmation = require_confirmation
self._header_provider = header_provider
self._progress_callback = progress_callback
self._use_isolated_event_loop = use_isolated_event_loop
self._server_url: Optional[str] = (
mcp_session_manager.connection_params.url
if use_isolated_event_loop
else None
)

@override
def _get_declaration(self) -> FunctionDeclaration:
Expand Down Expand Up @@ -397,6 +423,20 @@ async def _run_async_impl(
headers.update(dynamic_headers)
final_headers = headers if headers else None

if self._use_isolated_event_loop:
# Run in a dedicated thread with an isolated event loop to avoid the
# anyio CancelScope cross-task error on Vertex AI Agent Engine.
# See mcp_thread_utils.py for the full explanation.
from .mcp_thread_utils import call_tool_in_thread # pylint: disable=g-import-not-at-top

return await asyncio.to_thread(
call_tool_in_thread,
self._server_url,
self.name,
args,
final_headers,
)

# Propagate trace context in the _meta field as sprcified by MCP protocol.
# See https://agentclientprotocol.com/protocol/extensibility#the-meta-field
trace_carrier: Dict[str, str] = {}
Expand Down
55 changes: 55 additions & 0 deletions src/google/adk/tools/mcp_tool/mcp_toolset.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def __init__(
use_mcp_resources: Optional[bool] = False,
sampling_callback: Optional[SamplingFnT] = None,
sampling_capabilities: Optional[SamplingCapability] = None,
use_isolated_event_loop: bool = False,
):
"""Initializes the McpToolset.

Expand Down Expand Up @@ -157,6 +158,14 @@ def __init__(
sampling_callback: Optional callback to handle sampling requests from the
MCP server.
sampling_capabilities: Optional capabilities for sampling.
use_isolated_event_loop: When ``True``, each MCP operation (tool
discovery and tool calls) runs in a dedicated thread with an isolated
``asyncio`` event loop. This avoids the anyio ``CancelScope`` cross-task
error that occurs on Vertex AI Agent Engine when using
``StreamableHTTPConnectionParams``. Only supported with
``StreamableHTTPConnectionParams``; raises ``ValueError`` for other
transport types. Note: ``progress_callback`` and MCP sampling are not
invoked in this mode.
"""

# --- BEGIN BOUND TOKEN PATCH ---
Expand All @@ -176,6 +185,14 @@ def __init__(
if not connection_params:
raise ValueError("Missing connection params in McpToolset.")

if use_isolated_event_loop and not isinstance(
connection_params, StreamableHTTPConnectionParams
):
raise ValueError(
"use_isolated_event_loop=True is only supported with"
" StreamableHTTPConnectionParams."
)

self._connection_params = connection_params
self._errlog = errlog
self._header_provider = header_provider
Expand All @@ -202,6 +219,7 @@ def __init__(
else None
)
self._use_mcp_resources = use_mcp_resources
self._use_isolated_event_loop = use_isolated_event_loop

def _get_auth_headers(
self, readonly_context: Optional[ReadonlyContext] = None
Expand Down Expand Up @@ -340,6 +358,43 @@ async def get_tools(
Returns:
List[BaseTool]: A list of tools available under the specified context.
"""
if self._use_isolated_event_loop:
# Discover tools via an isolated thread to avoid the anyio CancelScope
# cross-task error on Vertex AI Agent Engine.
# See mcp_thread_utils.py for the full explanation.
from .mcp_thread_utils import list_tools_in_thread # pylint: disable=g-import-not-at-top

headers: Dict[str, str] = {}
auth_headers = self._get_auth_headers(readonly_context)
if auth_headers:
headers.update(auth_headers)
if self._header_provider and readonly_context:
provider_headers = self._header_provider(readonly_context)
if provider_headers:
headers.update(provider_headers)

raw_tools = await asyncio.to_thread(
list_tools_in_thread,
self._connection_params.url,
headers or None,
)
tools = []
for tool in raw_tools:
mcp_tool = MCPTool(
mcp_tool=tool,
mcp_session_manager=self._mcp_session_manager,
auth_scheme=self._auth_scheme,
auth_credential=self._auth_credential,
require_confirmation=self._require_confirmation,
header_provider=self._header_provider,
use_isolated_event_loop=True,
)
if self._is_tool_selected(mcp_tool, readonly_context):
tools.append(mcp_tool)
if self._use_mcp_resources:
tools.append(LoadMcpResourceTool(mcp_toolset=self))
return tools

# Fetch available tools from the MCP server
tools_response: ListToolsResult = await self._execute_with_session(
lambda session: session.list_tools(),
Expand Down
Loading