mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-02-28 00:38:13 +00:00
Compare commits
2 Commits
main
...
lg-mcp-eve
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a96f114a36 | ||
|
|
ca220cdc23 |
@@ -18,46 +18,77 @@ Composio is an integration platform that allows you to connect your AI agents to
|
||||
To incorporate Composio tools into your project, follow the instructions below:
|
||||
|
||||
```shell
|
||||
pip install composio composio-crewai
|
||||
pip install composio-crewai
|
||||
pip install crewai
|
||||
```
|
||||
|
||||
After the installation is complete, set your Composio API key as `COMPOSIO_API_KEY`. Get your Composio API key from [here](https://platform.composio.dev)
|
||||
After the installation is complete, either run `composio login` or export your composio API key as `COMPOSIO_API_KEY`. Get your Composio API key from [here](https://app.composio.dev)
|
||||
|
||||
## Example
|
||||
|
||||
The following example demonstrates how to initialize the tool and execute a github action:
|
||||
|
||||
1. Initialize Composio with CrewAI Provider
|
||||
1. Initialize Composio toolset
|
||||
|
||||
```python Code
|
||||
from composio_crewai import ComposioProvider
|
||||
from composio import Composio
|
||||
from composio_crewai import ComposioToolSet, App, Action
|
||||
from crewai import Agent, Task, Crew
|
||||
|
||||
composio = Composio(provider=ComposioProvider())
|
||||
toolset = ComposioToolSet()
|
||||
```
|
||||
|
||||
2. Create a new Composio Session and retrieve the tools
|
||||
2. Connect your GitHub account
|
||||
<CodeGroup>
|
||||
```python
|
||||
session = composio.create(
|
||||
user_id="your-user-id",
|
||||
toolkits=["gmail", "github"] # optional, default is all toolkits
|
||||
)
|
||||
tools = session.tools()
|
||||
```shell CLI
|
||||
composio add github
|
||||
```
|
||||
```python Code
|
||||
request = toolset.initiate_connection(app=App.GITHUB)
|
||||
print(f"Open this URL to authenticate: {request.redirectUrl}")
|
||||
```
|
||||
Read more about sessions and user management [here](https://docs.composio.dev/docs/configuring-sessions)
|
||||
</CodeGroup>
|
||||
|
||||
3. Authenticating users manually
|
||||
3. Get Tools
|
||||
|
||||
Composio automatically authenticates the users during the agent chat session. However, you can also authenticate the user manually by calling the `authorize` method.
|
||||
- Retrieving all the tools from an app (not recommended for production):
|
||||
```python Code
|
||||
connection_request = session.authorize("github")
|
||||
print(f"Open this URL to authenticate: {connection_request.redirect_url}")
|
||||
tools = toolset.get_tools(apps=[App.GITHUB])
|
||||
```
|
||||
|
||||
- Filtering tools based on tags:
|
||||
```python Code
|
||||
tag = "users"
|
||||
|
||||
filtered_action_enums = toolset.find_actions_by_tags(
|
||||
App.GITHUB,
|
||||
tags=[tag],
|
||||
)
|
||||
|
||||
tools = toolset.get_tools(actions=filtered_action_enums)
|
||||
```
|
||||
|
||||
- Filtering tools based on use case:
|
||||
```python Code
|
||||
use_case = "Star a repository on GitHub"
|
||||
|
||||
filtered_action_enums = toolset.find_actions_by_use_case(
|
||||
App.GITHUB, use_case=use_case, advanced=False
|
||||
)
|
||||
|
||||
tools = toolset.get_tools(actions=filtered_action_enums)
|
||||
```
|
||||
<Tip>Set `advanced` to True to get actions for complex use cases</Tip>
|
||||
|
||||
- Using specific tools:
|
||||
|
||||
In this demo, we will use the `GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER` action from the GitHub app.
|
||||
```python Code
|
||||
tools = toolset.get_tools(
|
||||
actions=[Action.GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER]
|
||||
)
|
||||
```
|
||||
Learn more about filtering actions [here](https://docs.composio.dev/patterns/tools/use-tools/use-specific-actions)
|
||||
|
||||
4. Define agent
|
||||
|
||||
```python Code
|
||||
@@ -85,4 +116,4 @@ crew = Crew(agents=[crewai_agent], tasks=[task])
|
||||
crew.kickoff()
|
||||
```
|
||||
|
||||
* More detailed list of tools can be found [here](https://docs.composio.dev/toolkits)
|
||||
* More detailed list of tools can be found [here](https://app.composio.dev)
|
||||
|
||||
@@ -18,46 +18,77 @@ Composio는 AI 에이전트를 250개 이상의 도구와 연결할 수 있는
|
||||
Composio 도구를 프로젝트에 통합하려면 아래 지침을 따르세요:
|
||||
|
||||
```shell
|
||||
pip install composio composio-crewai
|
||||
pip install composio-crewai
|
||||
pip install crewai
|
||||
```
|
||||
|
||||
설치가 완료되면 Composio API 키를 `COMPOSIO_API_KEY`로 설정하세요. Composio API 키는 [여기](https://platform.composio.dev)에서 받을 수 있습니다.
|
||||
설치가 완료된 후, `composio login`을 실행하거나 Composio API 키를 `COMPOSIO_API_KEY`로 export하세요. Composio API 키는 [여기](https://app.composio.dev)에서 받을 수 있습니다.
|
||||
|
||||
## 예시
|
||||
|
||||
다음 예시는 도구를 초기화하고 GitHub 액션을 실행하는 방법을 보여줍니다:
|
||||
다음 예시는 도구를 초기화하고 github action을 실행하는 방법을 보여줍니다:
|
||||
|
||||
1. CrewAI Provider와 함께 Composio 초기화
|
||||
1. Composio 도구 세트 초기화
|
||||
|
||||
```python Code
|
||||
from composio_crewai import ComposioProvider
|
||||
from composio import Composio
|
||||
from composio_crewai import ComposioToolSet, App, Action
|
||||
from crewai import Agent, Task, Crew
|
||||
|
||||
composio = Composio(provider=ComposioProvider())
|
||||
toolset = ComposioToolSet()
|
||||
```
|
||||
|
||||
2. 새 Composio 세션을 만들고 도구 가져오기
|
||||
2. GitHub 계정 연결
|
||||
<CodeGroup>
|
||||
```python
|
||||
session = composio.create(
|
||||
user_id="your-user-id",
|
||||
toolkits=["gmail", "github"] # optional, default is all toolkits
|
||||
)
|
||||
tools = session.tools()
|
||||
```shell CLI
|
||||
composio add github
|
||||
```
|
||||
```python Code
|
||||
request = toolset.initiate_connection(app=App.GITHUB)
|
||||
print(f"Open this URL to authenticate: {request.redirectUrl}")
|
||||
```
|
||||
세션 및 사용자 관리에 대한 자세한 내용은 [여기](https://docs.composio.dev/docs/configuring-sessions)를 참고하세요.
|
||||
</CodeGroup>
|
||||
|
||||
3. 사용자 수동 인증하기
|
||||
3. 도구 가져오기
|
||||
|
||||
Composio는 에이전트 채팅 세션 중에 사용자를 자동으로 인증합니다. 하지만 `authorize` 메서드를 호출해 사용자를 수동으로 인증할 수도 있습니다.
|
||||
- 앱에서 모든 도구를 가져오기 (프로덕션 환경에서는 권장하지 않음):
|
||||
```python Code
|
||||
connection_request = session.authorize("github")
|
||||
print(f"Open this URL to authenticate: {connection_request.redirect_url}")
|
||||
tools = toolset.get_tools(apps=[App.GITHUB])
|
||||
```
|
||||
|
||||
- 태그를 기반으로 도구 필터링:
|
||||
```python Code
|
||||
tag = "users"
|
||||
|
||||
filtered_action_enums = toolset.find_actions_by_tags(
|
||||
App.GITHUB,
|
||||
tags=[tag],
|
||||
)
|
||||
|
||||
tools = toolset.get_tools(actions=filtered_action_enums)
|
||||
```
|
||||
|
||||
- 사용 사례를 기반으로 도구 필터링:
|
||||
```python Code
|
||||
use_case = "Star a repository on GitHub"
|
||||
|
||||
filtered_action_enums = toolset.find_actions_by_use_case(
|
||||
App.GITHUB, use_case=use_case, advanced=False
|
||||
)
|
||||
|
||||
tools = toolset.get_tools(actions=filtered_action_enums)
|
||||
```
|
||||
<Tip>`advanced`를 True로 설정하면 복잡한 사용 사례를 위한 액션을 가져올 수 있습니다</Tip>
|
||||
|
||||
- 특정 도구 사용하기:
|
||||
|
||||
이 데모에서는 GitHub 앱의 `GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER` 액션을 사용합니다.
|
||||
```python Code
|
||||
tools = toolset.get_tools(
|
||||
actions=[Action.GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER]
|
||||
)
|
||||
```
|
||||
액션 필터링에 대해 더 자세한 내용을 보려면 [여기](https://docs.composio.dev/patterns/tools/use-tools/use-specific-actions)를 참고하세요.
|
||||
|
||||
4. 에이전트 정의
|
||||
|
||||
```python Code
|
||||
@@ -85,4 +116,4 @@ crew = Crew(agents=[crewai_agent], tasks=[task])
|
||||
crew.kickoff()
|
||||
```
|
||||
|
||||
* 더욱 자세한 도구 목록은 [여기](https://docs.composio.dev/toolkits)에서 확인할 수 있습니다.
|
||||
* 더욱 자세한 도구 리스트는 [여기](https://app.composio.dev)에서 확인하실 수 있습니다.
|
||||
@@ -11,53 +11,84 @@ mode: "wide"
|
||||
Composio é uma plataforma de integração que permite conectar seus agentes de IA a mais de 250 ferramentas. Os principais recursos incluem:
|
||||
|
||||
- **Autenticação de Nível Empresarial**: Suporte integrado para OAuth, Chaves de API, JWT com atualização automática de token
|
||||
- **Observabilidade Completa**: Logs detalhados de uso das ferramentas, carimbos de data/hora de execução e muito mais
|
||||
- **Observabilidade Completa**: Logs detalhados de uso das ferramentas, registros de execução, e muito mais
|
||||
|
||||
## Instalação
|
||||
|
||||
Para incorporar as ferramentas Composio em seu projeto, siga as instruções abaixo:
|
||||
|
||||
```shell
|
||||
pip install composio composio-crewai
|
||||
pip install composio-crewai
|
||||
pip install crewai
|
||||
```
|
||||
|
||||
Após concluir a instalação, defina sua chave de API do Composio como `COMPOSIO_API_KEY`. Obtenha sua chave de API do Composio [aqui](https://platform.composio.dev)
|
||||
Após a conclusão da instalação, execute `composio login` ou exporte sua chave de API do composio como `COMPOSIO_API_KEY`. Obtenha sua chave de API Composio [aqui](https://app.composio.dev)
|
||||
|
||||
## Exemplo
|
||||
|
||||
O exemplo a seguir demonstra como inicializar a ferramenta e executar uma ação do GitHub:
|
||||
O exemplo a seguir demonstra como inicializar a ferramenta e executar uma ação do github:
|
||||
|
||||
1. Inicialize o Composio com o Provider do CrewAI
|
||||
1. Inicialize o conjunto de ferramentas Composio
|
||||
|
||||
```python Code
|
||||
from composio_crewai import ComposioProvider
|
||||
from composio import Composio
|
||||
from composio_crewai import ComposioToolSet, App, Action
|
||||
from crewai import Agent, Task, Crew
|
||||
|
||||
composio = Composio(provider=ComposioProvider())
|
||||
toolset = ComposioToolSet()
|
||||
```
|
||||
|
||||
2. Crie uma nova sessão Composio e recupere as ferramentas
|
||||
2. Conecte sua conta do GitHub
|
||||
<CodeGroup>
|
||||
```python
|
||||
session = composio.create(
|
||||
user_id="your-user-id",
|
||||
toolkits=["gmail", "github"] # optional, default is all toolkits
|
||||
)
|
||||
tools = session.tools()
|
||||
```shell CLI
|
||||
composio add github
|
||||
```
|
||||
```python Code
|
||||
request = toolset.initiate_connection(app=App.GITHUB)
|
||||
print(f"Open this URL to authenticate: {request.redirectUrl}")
|
||||
```
|
||||
Leia mais sobre sessões e gerenciamento de usuários [aqui](https://docs.composio.dev/docs/configuring-sessions)
|
||||
</CodeGroup>
|
||||
|
||||
3. Autenticação manual dos usuários
|
||||
3. Obtenha ferramentas
|
||||
|
||||
O Composio autentica automaticamente os usuários durante a sessão de chat do agente. No entanto, você também pode autenticar o usuário manualmente chamando o método `authorize`.
|
||||
- Recuperando todas as ferramentas de um app (não recomendado em produção):
|
||||
```python Code
|
||||
connection_request = session.authorize("github")
|
||||
print(f"Open this URL to authenticate: {connection_request.redirect_url}")
|
||||
tools = toolset.get_tools(apps=[App.GITHUB])
|
||||
```
|
||||
|
||||
- Filtrando ferramentas com base em tags:
|
||||
```python Code
|
||||
tag = "users"
|
||||
|
||||
filtered_action_enums = toolset.find_actions_by_tags(
|
||||
App.GITHUB,
|
||||
tags=[tag],
|
||||
)
|
||||
|
||||
tools = toolset.get_tools(actions=filtered_action_enums)
|
||||
```
|
||||
|
||||
- Filtrando ferramentas com base no caso de uso:
|
||||
```python Code
|
||||
use_case = "Star a repository on GitHub"
|
||||
|
||||
filtered_action_enums = toolset.find_actions_by_use_case(
|
||||
App.GITHUB, use_case=use_case, advanced=False
|
||||
)
|
||||
|
||||
tools = toolset.get_tools(actions=filtered_action_enums)
|
||||
```
|
||||
<Tip>Defina `advanced` como True para obter ações para casos de uso complexos</Tip>
|
||||
|
||||
- Usando ferramentas específicas:
|
||||
|
||||
Neste exemplo, usaremos a ação `GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER` do app GitHub.
|
||||
```python Code
|
||||
tools = toolset.get_tools(
|
||||
actions=[Action.GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER]
|
||||
)
|
||||
```
|
||||
Saiba mais sobre como filtrar ações [aqui](https://docs.composio.dev/patterns/tools/use-tools/use-specific-actions)
|
||||
|
||||
4. Defina o agente
|
||||
|
||||
```python Code
|
||||
@@ -85,4 +116,4 @@ crew = Crew(agents=[crewai_agent], tasks=[task])
|
||||
crew.kickoff()
|
||||
```
|
||||
|
||||
* Uma lista mais detalhada de ferramentas pode ser encontrada [aqui](https://docs.composio.dev/toolkits)
|
||||
* Uma lista mais detalhada de ferramentas pode ser encontrada [aqui](https://app.composio.dev)
|
||||
@@ -95,7 +95,7 @@ class MCPClient:
|
||||
self.discovery_timeout = discovery_timeout
|
||||
self.max_retries = max_retries
|
||||
self.cache_tools_list = cache_tools_list
|
||||
# self._logger = logger or logging.getLogger(__name__)
|
||||
self._logger = logger or logging.getLogger(__name__)
|
||||
self._session: Any = None
|
||||
self._initialized = False
|
||||
self._exit_stack = AsyncExitStack()
|
||||
@@ -358,10 +358,12 @@ class MCPClient:
|
||||
"""Cleanup resources when an error occurs during connection."""
|
||||
try:
|
||||
await self._exit_stack.aclose()
|
||||
|
||||
except Exception as e:
|
||||
# Best effort cleanup - ignore all other errors
|
||||
raise RuntimeError(f"Error during MCP client cleanup: {e}") from e
|
||||
except (RuntimeError, BaseExceptionGroup) as e:
|
||||
error_msg = str(e).lower()
|
||||
if "cancel scope" not in error_msg and "task" not in error_msg:
|
||||
raise RuntimeError(f"Error during MCP client cleanup: {e}") from e
|
||||
except Exception:
|
||||
self._logger.debug("Suppressed error during MCP cleanup", exc_info=True)
|
||||
finally:
|
||||
self._session = None
|
||||
self._initialized = False
|
||||
@@ -374,8 +376,12 @@ class MCPClient:
|
||||
|
||||
try:
|
||||
await self._exit_stack.aclose()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error during MCP client disconnect: {e}") from e
|
||||
except (RuntimeError, BaseExceptionGroup) as e:
|
||||
error_msg = str(e).lower()
|
||||
if "cancel scope" not in error_msg and "task" not in error_msg:
|
||||
raise RuntimeError(f"Error during MCP client disconnect: {e}") from e
|
||||
except Exception:
|
||||
self._logger.debug("Suppressed error during MCP disconnect", exc_info=True)
|
||||
finally:
|
||||
self._session = None
|
||||
self._initialized = False
|
||||
|
||||
@@ -87,7 +87,12 @@ class MCPToolResolver:
|
||||
return all_tools
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Disconnect all MCP client connections."""
|
||||
"""Disconnect all MCP client connections.
|
||||
|
||||
Submits the disconnect coroutines to the persistent MCP event loop
|
||||
so that transport context managers are exited on the same loop they
|
||||
were entered on.
|
||||
"""
|
||||
if not self._clients:
|
||||
return
|
||||
|
||||
@@ -97,7 +102,11 @@ class MCPToolResolver:
|
||||
await client.disconnect()
|
||||
|
||||
try:
|
||||
asyncio.run(_disconnect_all())
|
||||
from crewai.tools.mcp_native_tool import _get_mcp_event_loop
|
||||
|
||||
loop = _get_mcp_event_loop()
|
||||
future = asyncio.run_coroutine_threadsafe(_disconnect_all(), loop)
|
||||
future.result(timeout=30)
|
||||
except Exception as e:
|
||||
self._logger.log("error", f"Error during MCP client cleanup: {e}")
|
||||
finally:
|
||||
@@ -330,30 +339,27 @@ class MCPToolResolver:
|
||||
) from e
|
||||
|
||||
try:
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
import concurrent.futures
|
||||
from crewai.tools.mcp_native_tool import _get_mcp_event_loop
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
future = executor.submit(
|
||||
asyncio.run, _setup_client_and_list_tools()
|
||||
)
|
||||
tools_list = future.result()
|
||||
except RuntimeError:
|
||||
try:
|
||||
tools_list = asyncio.run(_setup_client_and_list_tools())
|
||||
except RuntimeError as e:
|
||||
error_msg = str(e).lower()
|
||||
if "cancel scope" in error_msg or "task" in error_msg:
|
||||
raise ConnectionError(
|
||||
"MCP connection failed due to event loop cleanup issues. "
|
||||
"This may be due to authentication errors or server unavailability."
|
||||
) from e
|
||||
except asyncio.CancelledError as e:
|
||||
loop = _get_mcp_event_loop()
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
_setup_client_and_list_tools(), loop
|
||||
)
|
||||
try:
|
||||
tools_list = future.result(timeout=60)
|
||||
except RuntimeError as e:
|
||||
error_msg = str(e).lower()
|
||||
if "cancel scope" in error_msg or "task" in error_msg:
|
||||
raise ConnectionError(
|
||||
"MCP connection was cancelled. This may indicate an authentication "
|
||||
"error or server unavailability."
|
||||
"MCP connection failed due to event loop cleanup issues. "
|
||||
"This may be due to authentication errors or server unavailability."
|
||||
) from e
|
||||
raise
|
||||
except asyncio.CancelledError as e:
|
||||
raise ConnectionError(
|
||||
"MCP connection was cancelled. This may indicate an authentication "
|
||||
"error or server unavailability."
|
||||
) from e
|
||||
|
||||
if mcp_config.tool_filter:
|
||||
filtered_tools = []
|
||||
@@ -410,7 +416,13 @@ class MCPToolResolver:
|
||||
return cast(list[BaseTool], tools), client
|
||||
except Exception as e:
|
||||
if client.connected:
|
||||
asyncio.run(client.disconnect())
|
||||
try:
|
||||
fut = asyncio.run_coroutine_threadsafe(
|
||||
client.disconnect(), loop
|
||||
)
|
||||
fut.result(timeout=10)
|
||||
except Exception:
|
||||
self._logger.log("debug", "Suppressed error during MCP client disconnect on cleanup")
|
||||
|
||||
raise RuntimeError(f"Failed to get native MCP tools: {e}") from e
|
||||
|
||||
|
||||
@@ -5,11 +5,37 @@ for better performance and connection management.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
from crewai.tools import BaseTool
|
||||
|
||||
|
||||
_mcp_loop: asyncio.AbstractEventLoop | None = None
|
||||
_mcp_loop_thread: threading.Thread | None = None
|
||||
_mcp_loop_lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_mcp_event_loop() -> asyncio.AbstractEventLoop:
|
||||
"""Return (and lazily start) a persistent event loop for MCP operations.
|
||||
|
||||
All MCP SDK transports use anyio task groups whose cancel scopes must be
|
||||
entered and exited on the same event loop / task. By funnelling every
|
||||
MCP call through one long-lived loop we avoid the "exit cancel scope in
|
||||
a different task" errors that happen when asyncio.run() creates a
|
||||
throwaway loop per call.
|
||||
"""
|
||||
global _mcp_loop, _mcp_loop_thread
|
||||
with _mcp_loop_lock:
|
||||
if _mcp_loop is None or _mcp_loop.is_closed():
|
||||
_mcp_loop = asyncio.new_event_loop()
|
||||
_mcp_loop_thread = threading.Thread(
|
||||
target=_mcp_loop.run_forever, daemon=True, name="mcp-event-loop"
|
||||
)
|
||||
_mcp_loop_thread.start()
|
||||
return _mcp_loop
|
||||
|
||||
|
||||
class MCPNativeTool(BaseTool):
|
||||
"""Native MCP tool that reuses client sessions.
|
||||
|
||||
@@ -38,13 +64,10 @@ class MCPNativeTool(BaseTool):
|
||||
server_name: Name of the MCP server for prefixing.
|
||||
original_tool_name: Original name of the tool on the MCP server.
|
||||
"""
|
||||
# Create tool name with server prefix to avoid conflicts
|
||||
prefixed_name = f"{server_name}_{tool_name}"
|
||||
|
||||
# Handle args_schema properly - BaseTool expects a BaseModel subclass
|
||||
args_schema = tool_schema.get("args_schema")
|
||||
|
||||
# Only pass args_schema if it's provided
|
||||
kwargs = {
|
||||
"name": prefixed_name,
|
||||
"description": tool_schema.get(
|
||||
@@ -57,11 +80,9 @@ class MCPNativeTool(BaseTool):
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# Set instance attributes after super().__init__
|
||||
self._mcp_client = mcp_client
|
||||
self._original_tool_name = original_tool_name or tool_name
|
||||
self._server_name = server_name
|
||||
# self._logger = logging.getLogger(__name__)
|
||||
|
||||
@property
|
||||
def mcp_client(self) -> Any:
|
||||
@@ -81,25 +102,21 @@ class MCPNativeTool(BaseTool):
|
||||
def _run(self, **kwargs) -> str:
|
||||
"""Execute tool using the MCP client session.
|
||||
|
||||
Submits work to a persistent background event loop so that all MCP
|
||||
transport context managers (which rely on anyio cancel scopes) stay
|
||||
on the same loop and task throughout their lifecycle.
|
||||
|
||||
Args:
|
||||
**kwargs: Arguments to pass to the MCP tool.
|
||||
|
||||
Returns:
|
||||
Result from the MCP tool execution.
|
||||
"""
|
||||
loop = _get_mcp_event_loop()
|
||||
timeout = self._mcp_client.connect_timeout + self._mcp_client.execution_timeout
|
||||
try:
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
|
||||
import concurrent.futures
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
coro = self._run_async(**kwargs)
|
||||
future = executor.submit(asyncio.run, coro)
|
||||
return future.result()
|
||||
except RuntimeError:
|
||||
return asyncio.run(self._run_async(**kwargs))
|
||||
|
||||
future = asyncio.run_coroutine_threadsafe(self._run_async(**kwargs), loop)
|
||||
return future.result(timeout=timeout)
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
f"Error executing MCP tool {self.original_tool_name}: {e!s}"
|
||||
@@ -114,18 +131,11 @@ class MCPNativeTool(BaseTool):
|
||||
Returns:
|
||||
Result from the MCP tool execution.
|
||||
"""
|
||||
# Note: Since we use asyncio.run() which creates a new event loop each time,
|
||||
# Always reconnect on-demand because asyncio.run() creates new event loops per call
|
||||
# All MCP transport context managers (stdio, streamablehttp_client, sse_client)
|
||||
# use anyio.create_task_group() which can't span different event loops
|
||||
if self._mcp_client.connected:
|
||||
await self._mcp_client.disconnect()
|
||||
|
||||
await self._mcp_client.connect()
|
||||
if not self._mcp_client.connected:
|
||||
await self._mcp_client.connect()
|
||||
|
||||
try:
|
||||
result = await self._mcp_client.call_tool(self.original_tool_name, kwargs)
|
||||
|
||||
except Exception as e:
|
||||
error_str = str(e).lower()
|
||||
if (
|
||||
@@ -135,24 +145,15 @@ class MCPNativeTool(BaseTool):
|
||||
):
|
||||
await self._mcp_client.disconnect()
|
||||
await self._mcp_client.connect()
|
||||
# Retry the call
|
||||
result = await self._mcp_client.call_tool(
|
||||
self.original_tool_name, kwargs
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
finally:
|
||||
# Always disconnect after tool call to ensure clean context manager lifecycle
|
||||
# This prevents "exit cancel scope in different task" errors
|
||||
# All transport context managers must be exited in the same event loop they were entered
|
||||
await self._mcp_client.disconnect()
|
||||
|
||||
# Extract result content
|
||||
if isinstance(result, str):
|
||||
return result
|
||||
|
||||
# Handle various result formats
|
||||
if hasattr(result, "content") and result.content:
|
||||
if isinstance(result.content, list) and len(result.content) > 0:
|
||||
content_item = result.content[0]
|
||||
|
||||
@@ -148,6 +148,8 @@ def test_mcp_tool_execution_in_sync_context(mock_tool_definitions):
|
||||
mock_client.connect = AsyncMock()
|
||||
mock_client.disconnect = AsyncMock()
|
||||
mock_client.call_tool = AsyncMock(return_value="test result")
|
||||
mock_client.connect_timeout = 30
|
||||
mock_client.execution_timeout = 30
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
agent = Agent(
|
||||
@@ -180,6 +182,8 @@ async def test_mcp_tool_execution_in_async_context(mock_tool_definitions):
|
||||
mock_client.connect = AsyncMock()
|
||||
mock_client.disconnect = AsyncMock()
|
||||
mock_client.call_tool = AsyncMock(return_value="test result")
|
||||
mock_client.connect_timeout = 30
|
||||
mock_client.execution_timeout = 30
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
agent = Agent(
|
||||
|
||||
Reference in New Issue
Block a user