Compare commits

...

5 Commits

Author SHA1 Message Date
Greyson LaLonde
cf51bf0d9f Merge branch 'main' into lg-mcp-event-loop 2026-03-02 12:05:06 -05:00
Greyson LaLonde
21224f2bc5 fix: conditionally pass plus header
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Empty strings are considered illegal values for bearer auth in `httpx`.
2026-03-02 09:27:54 -05:00
Giulio Leone
b76022c1e7 fix(telemetry): skip signal handler registration in non-main threads
* fix(telemetry): skip signal handler registration in non-main threads

When CrewAI is initialized from a non-main thread (e.g. Streamlit, Flask,
Django, Jupyter), the telemetry module attempted to register signal handlers
which only work in the main thread. This caused multiple noisy ValueError
tracebacks to be printed to stderr, confusing users even though the errors
were caught and non-fatal.

Check `threading.current_thread() is not threading.main_thread()` before
attempting signal registration, and skip silently with a debug-level log
message instead of printing full tracebacks.

Fixes crewAIInc/crewAI#4289

* fix(test): move Telemetry() inside signal.signal mock context

Refs: #4649

* fix(telemetry): move signal.signal mock inside thread to wrap Telemetry() construction

The patch context now activates inside init_in_thread so the mock
is guaranteed to be active before and during Telemetry.__init__,
addressing the Copilot review feedback.

Refs: #4289

* fix(test): mock logger.debug instead of capsys for deterministic assertion

Replace signal.signal-only mock with combined logger + signal mock.
Assert logger.debug was called with the skip message and signal.signal
was never invoked from the non-main thread.

Refs: #4289
2026-03-02 07:42:55 -05:00
Lucas Gomide
a96f114a36 Merge branch 'main' into lg-mcp-event-loop 2026-02-27 12:08:38 -03:00
Lucas Gomide
ca220cdc23 fix: use persistent event loop for MCP operations to prevent cancel scope errors
Replace per-call asyncio.run() with a single persistent background event
loop for all MCP operations. The MCP SDK's streamable HTTP transport uses
anyio task groups whose cancel scopes must be entered and exited on the
same event loop and task. Creating a throwaway loop per tool call caused
"Attempted to exit cancel scope in a different task" RuntimeErrors during
cleanup, preventing MCP tools from working reliably
2026-02-27 11:37:50 -03:00
8 changed files with 137 additions and 69 deletions

View File

@@ -22,14 +22,15 @@ class PlusAPI:
EPHEMERAL_TRACING_RESOURCE = "/crewai_plus/api/v1/tracing/ephemeral"
INTEGRATIONS_RESOURCE = "/crewai_plus/api/v1/integrations"
def __init__(self, api_key: str) -> None:
def __init__(self, api_key: str | None = None) -> None:
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"User-Agent": f"CrewAI-CLI/{get_crewai_version()}",
"X-Crewai-Version": get_crewai_version(),
}
if api_key:
self.headers["Authorization"] = f"Bearer {api_key}"
settings = Settings()
if settings.org_uuid:
self.headers["X-Crewai-Organization-Id"] = settings.org_uuid

View File

@@ -67,7 +67,7 @@ class TraceBatchManager:
api_key=get_auth_token(),
)
except AuthError:
self.plus_api = PlusAPI(api_key="")
self.plus_api = PlusAPI()
self.ephemeral_trace_url = None
def initialize_batch(

View File

@@ -95,7 +95,7 @@ class MCPClient:
self.discovery_timeout = discovery_timeout
self.max_retries = max_retries
self.cache_tools_list = cache_tools_list
# self._logger = logger or logging.getLogger(__name__)
self._logger = logger or logging.getLogger(__name__)
self._session: Any = None
self._initialized = False
self._exit_stack = AsyncExitStack()
@@ -358,10 +358,12 @@ class MCPClient:
"""Cleanup resources when an error occurs during connection."""
try:
await self._exit_stack.aclose()
except Exception as e:
# Best effort cleanup - ignore all other errors
raise RuntimeError(f"Error during MCP client cleanup: {e}") from e
except (RuntimeError, BaseExceptionGroup) as e:
error_msg = str(e).lower()
if "cancel scope" not in error_msg and "task" not in error_msg:
raise RuntimeError(f"Error during MCP client cleanup: {e}") from e
except Exception:
self._logger.debug("Suppressed error during MCP cleanup", exc_info=True)
finally:
self._session = None
self._initialized = False
@@ -374,8 +376,12 @@ class MCPClient:
try:
await self._exit_stack.aclose()
except Exception as e:
raise RuntimeError(f"Error during MCP client disconnect: {e}") from e
except (RuntimeError, BaseExceptionGroup) as e:
error_msg = str(e).lower()
if "cancel scope" not in error_msg and "task" not in error_msg:
raise RuntimeError(f"Error during MCP client disconnect: {e}") from e
except Exception:
self._logger.debug("Suppressed error during MCP disconnect", exc_info=True)
finally:
self._session = None
self._initialized = False

View File

@@ -87,7 +87,12 @@ class MCPToolResolver:
return all_tools
def cleanup(self) -> None:
"""Disconnect all MCP client connections."""
"""Disconnect all MCP client connections.
Submits the disconnect coroutines to the persistent MCP event loop
so that transport context managers are exited on the same loop they
were entered on.
"""
if not self._clients:
return
@@ -97,7 +102,11 @@ class MCPToolResolver:
await client.disconnect()
try:
asyncio.run(_disconnect_all())
from crewai.tools.mcp_native_tool import _get_mcp_event_loop
loop = _get_mcp_event_loop()
future = asyncio.run_coroutine_threadsafe(_disconnect_all(), loop)
future.result(timeout=30)
except Exception as e:
self._logger.log("error", f"Error during MCP client cleanup: {e}")
finally:
@@ -330,30 +339,27 @@ class MCPToolResolver:
) from e
try:
try:
asyncio.get_running_loop()
import concurrent.futures
from crewai.tools.mcp_native_tool import _get_mcp_event_loop
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
asyncio.run, _setup_client_and_list_tools()
)
tools_list = future.result()
except RuntimeError:
try:
tools_list = asyncio.run(_setup_client_and_list_tools())
except RuntimeError as e:
error_msg = str(e).lower()
if "cancel scope" in error_msg or "task" in error_msg:
raise ConnectionError(
"MCP connection failed due to event loop cleanup issues. "
"This may be due to authentication errors or server unavailability."
) from e
except asyncio.CancelledError as e:
loop = _get_mcp_event_loop()
future = asyncio.run_coroutine_threadsafe(
_setup_client_and_list_tools(), loop
)
try:
tools_list = future.result(timeout=60)
except RuntimeError as e:
error_msg = str(e).lower()
if "cancel scope" in error_msg or "task" in error_msg:
raise ConnectionError(
"MCP connection was cancelled. This may indicate an authentication "
"error or server unavailability."
"MCP connection failed due to event loop cleanup issues. "
"This may be due to authentication errors or server unavailability."
) from e
raise
except asyncio.CancelledError as e:
raise ConnectionError(
"MCP connection was cancelled. This may indicate an authentication "
"error or server unavailability."
) from e
if mcp_config.tool_filter:
filtered_tools = []
@@ -410,7 +416,13 @@ class MCPToolResolver:
return cast(list[BaseTool], tools), client
except Exception as e:
if client.connected:
asyncio.run(client.disconnect())
try:
fut = asyncio.run_coroutine_threadsafe(
client.disconnect(), loop
)
fut.result(timeout=10)
except Exception:
self._logger.log("debug", "Suppressed error during MCP client disconnect on cleanup")
raise RuntimeError(f"Failed to get native MCP tools: {e}") from e

View File

@@ -173,6 +173,12 @@ class Telemetry:
self._original_handlers: dict[int, Any] = {}
if threading.current_thread() is not threading.main_thread():
logger.debug(
"Skipping signal handler registration: not running in main thread"
)
return
self._register_signal_handler(signal.SIGTERM, SigTermEvent, shutdown=True)
self._register_signal_handler(signal.SIGINT, SigIntEvent, shutdown=True)
if hasattr(signal, "SIGHUP"):

View File

@@ -5,11 +5,37 @@ for better performance and connection management.
"""
import asyncio
import threading
from typing import Any
from crewai.tools import BaseTool
_mcp_loop: asyncio.AbstractEventLoop | None = None
_mcp_loop_thread: threading.Thread | None = None
_mcp_loop_lock = threading.Lock()
def _get_mcp_event_loop() -> asyncio.AbstractEventLoop:
"""Return (and lazily start) a persistent event loop for MCP operations.
All MCP SDK transports use anyio task groups whose cancel scopes must be
entered and exited on the same event loop / task. By funnelling every
MCP call through one long-lived loop we avoid the "exit cancel scope in
a different task" errors that happen when asyncio.run() creates a
throwaway loop per call.
"""
global _mcp_loop, _mcp_loop_thread
with _mcp_loop_lock:
if _mcp_loop is None or _mcp_loop.is_closed():
_mcp_loop = asyncio.new_event_loop()
_mcp_loop_thread = threading.Thread(
target=_mcp_loop.run_forever, daemon=True, name="mcp-event-loop"
)
_mcp_loop_thread.start()
return _mcp_loop
class MCPNativeTool(BaseTool):
"""Native MCP tool that reuses client sessions.
@@ -38,13 +64,10 @@ class MCPNativeTool(BaseTool):
server_name: Name of the MCP server for prefixing.
original_tool_name: Original name of the tool on the MCP server.
"""
# Create tool name with server prefix to avoid conflicts
prefixed_name = f"{server_name}_{tool_name}"
# Handle args_schema properly - BaseTool expects a BaseModel subclass
args_schema = tool_schema.get("args_schema")
# Only pass args_schema if it's provided
kwargs = {
"name": prefixed_name,
"description": tool_schema.get(
@@ -57,11 +80,9 @@ class MCPNativeTool(BaseTool):
super().__init__(**kwargs)
# Set instance attributes after super().__init__
self._mcp_client = mcp_client
self._original_tool_name = original_tool_name or tool_name
self._server_name = server_name
# self._logger = logging.getLogger(__name__)
@property
def mcp_client(self) -> Any:
@@ -81,25 +102,21 @@ class MCPNativeTool(BaseTool):
def _run(self, **kwargs) -> str:
"""Execute tool using the MCP client session.
Submits work to a persistent background event loop so that all MCP
transport context managers (which rely on anyio cancel scopes) stay
on the same loop and task throughout their lifecycle.
Args:
**kwargs: Arguments to pass to the MCP tool.
Returns:
Result from the MCP tool execution.
"""
loop = _get_mcp_event_loop()
timeout = self._mcp_client.connect_timeout + self._mcp_client.execution_timeout
try:
try:
asyncio.get_running_loop()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
coro = self._run_async(**kwargs)
future = executor.submit(asyncio.run, coro)
return future.result()
except RuntimeError:
return asyncio.run(self._run_async(**kwargs))
future = asyncio.run_coroutine_threadsafe(self._run_async(**kwargs), loop)
return future.result(timeout=timeout)
except Exception as e:
raise RuntimeError(
f"Error executing MCP tool {self.original_tool_name}: {e!s}"
@@ -114,18 +131,11 @@ class MCPNativeTool(BaseTool):
Returns:
Result from the MCP tool execution.
"""
# Note: Since we use asyncio.run() which creates a new event loop each time,
# Always reconnect on-demand because asyncio.run() creates new event loops per call
# All MCP transport context managers (stdio, streamablehttp_client, sse_client)
# use anyio.create_task_group() which can't span different event loops
if self._mcp_client.connected:
await self._mcp_client.disconnect()
await self._mcp_client.connect()
if not self._mcp_client.connected:
await self._mcp_client.connect()
try:
result = await self._mcp_client.call_tool(self.original_tool_name, kwargs)
except Exception as e:
error_str = str(e).lower()
if (
@@ -135,24 +145,15 @@ class MCPNativeTool(BaseTool):
):
await self._mcp_client.disconnect()
await self._mcp_client.connect()
# Retry the call
result = await self._mcp_client.call_tool(
self.original_tool_name, kwargs
)
else:
raise
finally:
# Always disconnect after tool call to ensure clean context manager lifecycle
# This prevents "exit cancel scope in different task" errors
# All transport context managers must be exited in the same event loop they were entered
await self._mcp_client.disconnect()
# Extract result content
if isinstance(result, str):
return result
# Handle various result formats
if hasattr(result, "content") and result.content:
if isinstance(result.content, list) and len(result.content) > 0:
content_item = result.content[0]

View File

@@ -148,6 +148,8 @@ def test_mcp_tool_execution_in_sync_context(mock_tool_definitions):
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.call_tool = AsyncMock(return_value="test result")
mock_client.connect_timeout = 30
mock_client.execution_timeout = 30
mock_client_class.return_value = mock_client
agent = Agent(
@@ -180,6 +182,8 @@ async def test_mcp_tool_execution_in_async_context(mock_tool_definitions):
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.call_tool = AsyncMock(return_value="test result")
mock_client.connect_timeout = 30
mock_client.execution_timeout = 30
mock_client_class.return_value = mock_client
agent = Agent(

View File

@@ -121,3 +121,41 @@ def test_telemetry_singleton_pattern():
thread.join()
assert all(instance is telemetry1 for instance in instances)
def test_no_signal_handler_traceback_in_non_main_thread():
"""Signal handler registration should be silently skipped in non-main threads.
Regression test for https://github.com/crewAIInc/crewAI/issues/4289
"""
errors: list[Exception] = []
mock_holder: dict = {}
def init_in_thread():
try:
Telemetry._instance = None
with (
patch.dict(
os.environ,
{"CREWAI_DISABLE_TELEMETRY": "false", "OTEL_SDK_DISABLED": "false"},
),
patch("crewai.telemetry.telemetry.TracerProvider"),
patch("signal.signal") as mock_signal,
patch("crewai.telemetry.telemetry.logger") as mock_logger,
):
Telemetry()
mock_holder["signal"] = mock_signal
mock_holder["logger"] = mock_logger
except Exception as exc:
errors.append(exc)
thread = threading.Thread(target=init_in_thread)
thread.start()
thread.join()
assert not errors, f"Unexpected error: {errors}"
assert mock_holder, "Thread did not execute"
mock_holder["signal"].assert_not_called()
mock_holder["logger"].debug.assert_any_call(
"Skipping signal handler registration: not running in main thread"
)