Compare commits

..

2 Commits

Author SHA1 Message Date
Lucas Gomide
a96f114a36 Merge branch 'main' into lg-mcp-event-loop 2026-02-27 12:08:38 -03:00
Lucas Gomide
ca220cdc23 fix: use persistent event loop for MCP operations to prevent cancel scope errors
Replace per-call asyncio.run() with a single persistent background event
loop for all MCP operations. The MCP SDK's streamable HTTP transport uses
anyio task groups whose cancel scopes must be entered and exited on the
same event loop and task. Creating a throwaway loop per tool call caused
"Attempted to exit cancel scope in a different task" RuntimeErrors during
cleanup, preventing MCP tools from working reliably
2026-02-27 11:37:50 -03:00
7 changed files with 242 additions and 126 deletions

View File

@@ -18,46 +18,77 @@ Composio is an integration platform that allows you to connect your AI agents to
To incorporate Composio tools into your project, follow the instructions below:
```shell
pip install composio composio-crewai
pip install composio-crewai
pip install crewai
```
After the installation is complete, set your Composio API key as `COMPOSIO_API_KEY`. Get your Composio API key from [here](https://platform.composio.dev)
After the installation is complete, either run `composio login` or export your composio API key as `COMPOSIO_API_KEY`. Get your Composio API key from [here](https://app.composio.dev)
## Example
The following example demonstrates how to initialize the tool and execute a github action:
1. Initialize Composio with CrewAI Provider
1. Initialize Composio toolset
```python Code
from composio_crewai import ComposioProvider
from composio import Composio
from composio_crewai import ComposioToolSet, App, Action
from crewai import Agent, Task, Crew
composio = Composio(provider=ComposioProvider())
toolset = ComposioToolSet()
```
2. Create a new Composio Session and retrieve the tools
2. Connect your GitHub account
<CodeGroup>
```python
session = composio.create(
user_id="your-user-id",
toolkits=["gmail", "github"] # optional, default is all toolkits
)
tools = session.tools()
```shell CLI
composio add github
```
```python Code
request = toolset.initiate_connection(app=App.GITHUB)
print(f"Open this URL to authenticate: {request.redirectUrl}")
```
Read more about sessions and user management [here](https://docs.composio.dev/docs/configuring-sessions)
</CodeGroup>
3. Authenticating users manually
3. Get Tools
Composio automatically authenticates the users during the agent chat session. However, you can also authenticate the user manually by calling the `authorize` method.
- Retrieving all the tools from an app (not recommended for production):
```python Code
connection_request = session.authorize("github")
print(f"Open this URL to authenticate: {connection_request.redirect_url}")
tools = toolset.get_tools(apps=[App.GITHUB])
```
- Filtering tools based on tags:
```python Code
tag = "users"
filtered_action_enums = toolset.find_actions_by_tags(
App.GITHUB,
tags=[tag],
)
tools = toolset.get_tools(actions=filtered_action_enums)
```
- Filtering tools based on use case:
```python Code
use_case = "Star a repository on GitHub"
filtered_action_enums = toolset.find_actions_by_use_case(
App.GITHUB, use_case=use_case, advanced=False
)
tools = toolset.get_tools(actions=filtered_action_enums)
```
<Tip>Set `advanced` to True to get actions for complex use cases</Tip>
- Using specific tools:
In this demo, we will use the `GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER` action from the GitHub app.
```python Code
tools = toolset.get_tools(
actions=[Action.GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER]
)
```
Learn more about filtering actions [here](https://docs.composio.dev/patterns/tools/use-tools/use-specific-actions)
4. Define agent
```python Code
@@ -85,4 +116,4 @@ crew = Crew(agents=[crewai_agent], tasks=[task])
crew.kickoff()
```
* More detailed list of tools can be found [here](https://docs.composio.dev/toolkits)
* More detailed list of tools can be found [here](https://app.composio.dev)

View File

@@ -18,46 +18,77 @@ Composio는 AI 에이전트를 250개 이상의 도구와 연결할 수 있는
Composio 도구를 프로젝트에 통합하려면 아래 지침을 따르세요:
```shell
pip install composio composio-crewai
pip install composio-crewai
pip install crewai
```
설치가 완료되면 Composio API 키를 `COMPOSIO_API_KEY`로 설정하세요. Composio API 키는 [여기](https://platform.composio.dev)에서 받을 수 있습니다.
설치가 완료된 후, `composio login`을 실행하거나 Composio API 키를 `COMPOSIO_API_KEY`로 export하세요. Composio API 키는 [여기](https://app.composio.dev)에서 받을 수 있습니다.
## 예시
다음 예시는 도구를 초기화하고 GitHub 액션을 실행하는 방법을 보여줍니다:
다음 예시는 도구를 초기화하고 github action을 실행하는 방법을 보여줍니다:
1. CrewAI Provider와 함께 Composio 초기화
1. Composio 도구 세트 초기화
```python Code
from composio_crewai import ComposioProvider
from composio import Composio
from composio_crewai import ComposioToolSet, App, Action
from crewai import Agent, Task, Crew
composio = Composio(provider=ComposioProvider())
toolset = ComposioToolSet()
```
2. 새 Composio 세션을 만들고 도구 가져오기
2. GitHub 계정 연결
<CodeGroup>
```python
session = composio.create(
user_id="your-user-id",
toolkits=["gmail", "github"] # optional, default is all toolkits
)
tools = session.tools()
```shell CLI
composio add github
```
```python Code
request = toolset.initiate_connection(app=App.GITHUB)
print(f"Open this URL to authenticate: {request.redirectUrl}")
```
세션 및 사용자 관리에 대한 자세한 내용은 [여기](https://docs.composio.dev/docs/configuring-sessions)를 참고하세요.
</CodeGroup>
3. 사용자 수동 인증하
3. 도구 가져오
Composio는 에이전트 채팅 세션 중에 사용자를 자동으로 인증합니다. 하지만 `authorize` 메서드를 호출해 사용자를 수동으로 인증할 수도 있습니다.
- 앱에서 모든 도구를 가져오기 (프로덕션 환경에서는 권장하지 않음):
```python Code
connection_request = session.authorize("github")
print(f"Open this URL to authenticate: {connection_request.redirect_url}")
tools = toolset.get_tools(apps=[App.GITHUB])
```
- 태그를 기반으로 도구 필터링:
```python Code
tag = "users"
filtered_action_enums = toolset.find_actions_by_tags(
App.GITHUB,
tags=[tag],
)
tools = toolset.get_tools(actions=filtered_action_enums)
```
- 사용 사례를 기반으로 도구 필터링:
```python Code
use_case = "Star a repository on GitHub"
filtered_action_enums = toolset.find_actions_by_use_case(
App.GITHUB, use_case=use_case, advanced=False
)
tools = toolset.get_tools(actions=filtered_action_enums)
```
<Tip>`advanced`를 True로 설정하면 복잡한 사용 사례를 위한 액션을 가져올 수 있습니다</Tip>
- 특정 도구 사용하기:
이 데모에서는 GitHub 앱의 `GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER` 액션을 사용합니다.
```python Code
tools = toolset.get_tools(
actions=[Action.GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER]
)
```
액션 필터링에 대해 더 자세한 내용을 보려면 [여기](https://docs.composio.dev/patterns/tools/use-tools/use-specific-actions)를 참고하세요.
4. 에이전트 정의
```python Code
@@ -85,4 +116,4 @@ crew = Crew(agents=[crewai_agent], tasks=[task])
crew.kickoff()
```
* 더욱 자세한 도구 목록은 [여기](https://docs.composio.dev/toolkits)에서 확인 수 있습니다.
* 더욱 자세한 도구 리스트는 [여기](https://app.composio.dev)에서 확인하실 수 있습니다.

View File

@@ -11,53 +11,84 @@ mode: "wide"
Composio é uma plataforma de integração que permite conectar seus agentes de IA a mais de 250 ferramentas. Os principais recursos incluem:
- **Autenticação de Nível Empresarial**: Suporte integrado para OAuth, Chaves de API, JWT com atualização automática de token
- **Observabilidade Completa**: Logs detalhados de uso das ferramentas, carimbos de data/hora de execução e muito mais
- **Observabilidade Completa**: Logs detalhados de uso das ferramentas, registros de execução, e muito mais
## Instalação
Para incorporar as ferramentas Composio em seu projeto, siga as instruções abaixo:
```shell
pip install composio composio-crewai
pip install composio-crewai
pip install crewai
```
Após concluir a instalação, defina sua chave de API do Composio como `COMPOSIO_API_KEY`. Obtenha sua chave de API do Composio [aqui](https://platform.composio.dev)
Após a conclusão da instalação, execute `composio login` ou exporte sua chave de API do composio como `COMPOSIO_API_KEY`. Obtenha sua chave de API Composio [aqui](https://app.composio.dev)
## Exemplo
O exemplo a seguir demonstra como inicializar a ferramenta e executar uma ação do GitHub:
O exemplo a seguir demonstra como inicializar a ferramenta e executar uma ação do github:
1. Inicialize o Composio com o Provider do CrewAI
1. Inicialize o conjunto de ferramentas Composio
```python Code
from composio_crewai import ComposioProvider
from composio import Composio
from composio_crewai import ComposioToolSet, App, Action
from crewai import Agent, Task, Crew
composio = Composio(provider=ComposioProvider())
toolset = ComposioToolSet()
```
2. Crie uma nova sessão Composio e recupere as ferramentas
2. Conecte sua conta do GitHub
<CodeGroup>
```python
session = composio.create(
user_id="your-user-id",
toolkits=["gmail", "github"] # optional, default is all toolkits
)
tools = session.tools()
```shell CLI
composio add github
```
```python Code
request = toolset.initiate_connection(app=App.GITHUB)
print(f"Open this URL to authenticate: {request.redirectUrl}")
```
Leia mais sobre sessões e gerenciamento de usuários [aqui](https://docs.composio.dev/docs/configuring-sessions)
</CodeGroup>
3. Autenticação manual dos usuários
3. Obtenha ferramentas
O Composio autentica automaticamente os usuários durante a sessão de chat do agente. No entanto, você também pode autenticar o usuário manualmente chamando o método `authorize`.
- Recuperando todas as ferramentas de um app (não recomendado em produção):
```python Code
connection_request = session.authorize("github")
print(f"Open this URL to authenticate: {connection_request.redirect_url}")
tools = toolset.get_tools(apps=[App.GITHUB])
```
- Filtrando ferramentas com base em tags:
```python Code
tag = "users"
filtered_action_enums = toolset.find_actions_by_tags(
App.GITHUB,
tags=[tag],
)
tools = toolset.get_tools(actions=filtered_action_enums)
```
- Filtrando ferramentas com base no caso de uso:
```python Code
use_case = "Star a repository on GitHub"
filtered_action_enums = toolset.find_actions_by_use_case(
App.GITHUB, use_case=use_case, advanced=False
)
tools = toolset.get_tools(actions=filtered_action_enums)
```
<Tip>Defina `advanced` como True para obter ações para casos de uso complexos</Tip>
- Usando ferramentas específicas:
Neste exemplo, usaremos a ação `GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER` do app GitHub.
```python Code
tools = toolset.get_tools(
actions=[Action.GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER]
)
```
Saiba mais sobre como filtrar ações [aqui](https://docs.composio.dev/patterns/tools/use-tools/use-specific-actions)
4. Defina o agente
```python Code
@@ -85,4 +116,4 @@ crew = Crew(agents=[crewai_agent], tasks=[task])
crew.kickoff()
```
* Uma lista mais detalhada de ferramentas pode ser encontrada [aqui](https://docs.composio.dev/toolkits)
* Uma lista mais detalhada de ferramentas pode ser encontrada [aqui](https://app.composio.dev)

View File

@@ -95,7 +95,7 @@ class MCPClient:
self.discovery_timeout = discovery_timeout
self.max_retries = max_retries
self.cache_tools_list = cache_tools_list
# self._logger = logger or logging.getLogger(__name__)
self._logger = logger or logging.getLogger(__name__)
self._session: Any = None
self._initialized = False
self._exit_stack = AsyncExitStack()
@@ -358,10 +358,12 @@ class MCPClient:
"""Cleanup resources when an error occurs during connection."""
try:
await self._exit_stack.aclose()
except Exception as e:
# Best effort cleanup - ignore all other errors
raise RuntimeError(f"Error during MCP client cleanup: {e}") from e
except (RuntimeError, BaseExceptionGroup) as e:
error_msg = str(e).lower()
if "cancel scope" not in error_msg and "task" not in error_msg:
raise RuntimeError(f"Error during MCP client cleanup: {e}") from e
except Exception:
self._logger.debug("Suppressed error during MCP cleanup", exc_info=True)
finally:
self._session = None
self._initialized = False
@@ -374,8 +376,12 @@ class MCPClient:
try:
await self._exit_stack.aclose()
except Exception as e:
raise RuntimeError(f"Error during MCP client disconnect: {e}") from e
except (RuntimeError, BaseExceptionGroup) as e:
error_msg = str(e).lower()
if "cancel scope" not in error_msg and "task" not in error_msg:
raise RuntimeError(f"Error during MCP client disconnect: {e}") from e
except Exception:
self._logger.debug("Suppressed error during MCP disconnect", exc_info=True)
finally:
self._session = None
self._initialized = False

View File

@@ -87,7 +87,12 @@ class MCPToolResolver:
return all_tools
def cleanup(self) -> None:
"""Disconnect all MCP client connections."""
"""Disconnect all MCP client connections.
Submits the disconnect coroutines to the persistent MCP event loop
so that transport context managers are exited on the same loop they
were entered on.
"""
if not self._clients:
return
@@ -97,7 +102,11 @@ class MCPToolResolver:
await client.disconnect()
try:
asyncio.run(_disconnect_all())
from crewai.tools.mcp_native_tool import _get_mcp_event_loop
loop = _get_mcp_event_loop()
future = asyncio.run_coroutine_threadsafe(_disconnect_all(), loop)
future.result(timeout=30)
except Exception as e:
self._logger.log("error", f"Error during MCP client cleanup: {e}")
finally:
@@ -330,30 +339,27 @@ class MCPToolResolver:
) from e
try:
try:
asyncio.get_running_loop()
import concurrent.futures
from crewai.tools.mcp_native_tool import _get_mcp_event_loop
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
asyncio.run, _setup_client_and_list_tools()
)
tools_list = future.result()
except RuntimeError:
try:
tools_list = asyncio.run(_setup_client_and_list_tools())
except RuntimeError as e:
error_msg = str(e).lower()
if "cancel scope" in error_msg or "task" in error_msg:
raise ConnectionError(
"MCP connection failed due to event loop cleanup issues. "
"This may be due to authentication errors or server unavailability."
) from e
except asyncio.CancelledError as e:
loop = _get_mcp_event_loop()
future = asyncio.run_coroutine_threadsafe(
_setup_client_and_list_tools(), loop
)
try:
tools_list = future.result(timeout=60)
except RuntimeError as e:
error_msg = str(e).lower()
if "cancel scope" in error_msg or "task" in error_msg:
raise ConnectionError(
"MCP connection was cancelled. This may indicate an authentication "
"error or server unavailability."
"MCP connection failed due to event loop cleanup issues. "
"This may be due to authentication errors or server unavailability."
) from e
raise
except asyncio.CancelledError as e:
raise ConnectionError(
"MCP connection was cancelled. This may indicate an authentication "
"error or server unavailability."
) from e
if mcp_config.tool_filter:
filtered_tools = []
@@ -410,7 +416,13 @@ class MCPToolResolver:
return cast(list[BaseTool], tools), client
except Exception as e:
if client.connected:
asyncio.run(client.disconnect())
try:
fut = asyncio.run_coroutine_threadsafe(
client.disconnect(), loop
)
fut.result(timeout=10)
except Exception:
self._logger.log("debug", "Suppressed error during MCP client disconnect on cleanup")
raise RuntimeError(f"Failed to get native MCP tools: {e}") from e

View File

@@ -5,11 +5,37 @@ for better performance and connection management.
"""
import asyncio
import threading
from typing import Any
from crewai.tools import BaseTool
_mcp_loop: asyncio.AbstractEventLoop | None = None
_mcp_loop_thread: threading.Thread | None = None
_mcp_loop_lock = threading.Lock()
def _get_mcp_event_loop() -> asyncio.AbstractEventLoop:
"""Return (and lazily start) a persistent event loop for MCP operations.
All MCP SDK transports use anyio task groups whose cancel scopes must be
entered and exited on the same event loop / task. By funnelling every
MCP call through one long-lived loop we avoid the "exit cancel scope in
a different task" errors that happen when asyncio.run() creates a
throwaway loop per call.
"""
global _mcp_loop, _mcp_loop_thread
with _mcp_loop_lock:
if _mcp_loop is None or _mcp_loop.is_closed():
_mcp_loop = asyncio.new_event_loop()
_mcp_loop_thread = threading.Thread(
target=_mcp_loop.run_forever, daemon=True, name="mcp-event-loop"
)
_mcp_loop_thread.start()
return _mcp_loop
class MCPNativeTool(BaseTool):
"""Native MCP tool that reuses client sessions.
@@ -38,13 +64,10 @@ class MCPNativeTool(BaseTool):
server_name: Name of the MCP server for prefixing.
original_tool_name: Original name of the tool on the MCP server.
"""
# Create tool name with server prefix to avoid conflicts
prefixed_name = f"{server_name}_{tool_name}"
# Handle args_schema properly - BaseTool expects a BaseModel subclass
args_schema = tool_schema.get("args_schema")
# Only pass args_schema if it's provided
kwargs = {
"name": prefixed_name,
"description": tool_schema.get(
@@ -57,11 +80,9 @@ class MCPNativeTool(BaseTool):
super().__init__(**kwargs)
# Set instance attributes after super().__init__
self._mcp_client = mcp_client
self._original_tool_name = original_tool_name or tool_name
self._server_name = server_name
# self._logger = logging.getLogger(__name__)
@property
def mcp_client(self) -> Any:
@@ -81,25 +102,21 @@ class MCPNativeTool(BaseTool):
def _run(self, **kwargs) -> str:
"""Execute tool using the MCP client session.
Submits work to a persistent background event loop so that all MCP
transport context managers (which rely on anyio cancel scopes) stay
on the same loop and task throughout their lifecycle.
Args:
**kwargs: Arguments to pass to the MCP tool.
Returns:
Result from the MCP tool execution.
"""
loop = _get_mcp_event_loop()
timeout = self._mcp_client.connect_timeout + self._mcp_client.execution_timeout
try:
try:
asyncio.get_running_loop()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
coro = self._run_async(**kwargs)
future = executor.submit(asyncio.run, coro)
return future.result()
except RuntimeError:
return asyncio.run(self._run_async(**kwargs))
future = asyncio.run_coroutine_threadsafe(self._run_async(**kwargs), loop)
return future.result(timeout=timeout)
except Exception as e:
raise RuntimeError(
f"Error executing MCP tool {self.original_tool_name}: {e!s}"
@@ -114,18 +131,11 @@ class MCPNativeTool(BaseTool):
Returns:
Result from the MCP tool execution.
"""
# Note: Since we use asyncio.run() which creates a new event loop each time,
# Always reconnect on-demand because asyncio.run() creates new event loops per call
# All MCP transport context managers (stdio, streamablehttp_client, sse_client)
# use anyio.create_task_group() which can't span different event loops
if self._mcp_client.connected:
await self._mcp_client.disconnect()
await self._mcp_client.connect()
if not self._mcp_client.connected:
await self._mcp_client.connect()
try:
result = await self._mcp_client.call_tool(self.original_tool_name, kwargs)
except Exception as e:
error_str = str(e).lower()
if (
@@ -135,24 +145,15 @@ class MCPNativeTool(BaseTool):
):
await self._mcp_client.disconnect()
await self._mcp_client.connect()
# Retry the call
result = await self._mcp_client.call_tool(
self.original_tool_name, kwargs
)
else:
raise
finally:
# Always disconnect after tool call to ensure clean context manager lifecycle
# This prevents "exit cancel scope in different task" errors
# All transport context managers must be exited in the same event loop they were entered
await self._mcp_client.disconnect()
# Extract result content
if isinstance(result, str):
return result
# Handle various result formats
if hasattr(result, "content") and result.content:
if isinstance(result.content, list) and len(result.content) > 0:
content_item = result.content[0]

View File

@@ -148,6 +148,8 @@ def test_mcp_tool_execution_in_sync_context(mock_tool_definitions):
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.call_tool = AsyncMock(return_value="test result")
mock_client.connect_timeout = 30
mock_client.execution_timeout = 30
mock_client_class.return_value = mock_client
agent = Agent(
@@ -180,6 +182,8 @@ async def test_mcp_tool_execution_in_async_context(mock_tool_definitions):
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.call_tool = AsyncMock(return_value="test result")
mock_client.connect_timeout = 30
mock_client.execution_timeout = 30
mock_client_class.return_value = mock_client
agent = Agent(