Support async tool executions (#2983)

* test: fix structured tool tests

No tests were being executed from this file

* feat: support to run async tool

Some Tool requires async execution. This commit allow us to collect tool result from coroutines

* docs: add docs about asynchronous tool support
This commit is contained in:
Lucas Gomide
2025-06-10 13:17:06 -03:00
committed by GitHub
parent 5b740467cb
commit 5c51349a85
7 changed files with 1316 additions and 95 deletions

View File

@@ -32,6 +32,7 @@ The Enterprise Tools Repository includes:
- **Customizability**: Provides the flexibility to develop custom tools or utilize existing ones, catering to the specific needs of agents.
- **Error Handling**: Incorporates robust error handling mechanisms to ensure smooth operation.
- **Caching Mechanism**: Features intelligent caching to optimize performance and reduce redundant operations.
- **Asynchronous Support**: Handles both synchronous and asynchronous tools, enabling non-blocking operations.
## Using CrewAI Tools
@@ -177,6 +178,62 @@ class MyCustomTool(BaseTool):
return "Tool's result"
```
## Asynchronous Tool Support
CrewAI supports asynchronous tools, allowing you to implement tools that perform non-blocking operations like network requests, file I/O, or other async operations without blocking the main execution thread.
### Creating Async Tools
You can create async tools in two ways:
#### 1. Using the `tool` Decorator with Async Functions
```python Code
from crewai.tools import tool
@tool("fetch_data_async")
async def fetch_data_async(query: str) -> str:
"""Asynchronously fetch data based on the query."""
# Simulate async operation
await asyncio.sleep(1)
return f"Data retrieved for {query}"
```
#### 2. Implementing Async Methods in Custom Tool Classes
```python Code
from crewai.tools import BaseTool
class AsyncCustomTool(BaseTool):
name: str = "async_custom_tool"
description: str = "An asynchronous custom tool"
async def _run(self, query: str = "") -> str:
"""Asynchronously run the tool"""
# Your async implementation here
await asyncio.sleep(1)
return f"Processed {query} asynchronously"
```
### Using Async Tools
Async tools work seamlessly in both standard Crew workflows and Flow-based workflows:
```python Code
# In standard Crew
agent = Agent(role="researcher", tools=[async_custom_tool])
# In Flow
class MyFlow(Flow):
@start()
async def begin(self):
crew = Crew(agents=[agent])
result = await crew.kickoff_async()
return result
```
The CrewAI framework automatically handles the execution of both synchronous and asynchronous tools, so you don't need to worry about how to call them differently.
### Utilizing the `tool` Decorator
```python Code

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
import asyncio
import inspect
import textwrap
from typing import Any, Callable, Optional, Union, get_type_hints
@@ -239,7 +241,17 @@ class CrewStructuredTool:
) -> Any:
"""Main method for tool execution."""
parsed_args = self._parse_args(input)
return self.func(**parsed_args, **kwargs)
if inspect.iscoroutinefunction(self.func):
result = asyncio.run(self.func(**parsed_args, **kwargs))
return result
result = self.func(**parsed_args, **kwargs)
if asyncio.iscoroutine(result):
return asyncio.run(result)
return result
@property
def args(self) -> dict:

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -25,8 +25,7 @@ def schema_class():
return TestSchema
class InternalCrewStructuredTool:
def test_initialization(self, basic_function, schema_class):
def test_initialization(basic_function, schema_class):
"""Test basic initialization of CrewStructuredTool"""
tool = CrewStructuredTool(
name="test_tool",
@@ -40,7 +39,7 @@ class InternalCrewStructuredTool:
assert tool.func == basic_function
assert tool.args_schema == schema_class
def test_from_function(self, basic_function):
def test_from_function(basic_function):
"""Test creating tool from function"""
tool = CrewStructuredTool.from_function(
func=basic_function, name="test_tool", description="Test description"
@@ -51,7 +50,7 @@ class InternalCrewStructuredTool:
assert tool.func == basic_function
assert isinstance(tool.args_schema, type(BaseModel))
def test_validate_function_signature(self, basic_function, schema_class):
def test_validate_function_signature(basic_function, schema_class):
"""Test function signature validation"""
tool = CrewStructuredTool(
name="test_tool",
@@ -64,14 +63,14 @@ class InternalCrewStructuredTool:
tool._validate_function_signature()
@pytest.mark.asyncio
async def test_ainvoke(self, basic_function):
async def test_ainvoke(basic_function):
"""Test asynchronous invocation"""
tool = CrewStructuredTool.from_function(func=basic_function, name="test_tool")
result = await tool.ainvoke(input={"param1": "test"})
assert result == "test 0"
def test_parse_args_dict(self, basic_function):
def test_parse_args_dict(basic_function):
"""Test parsing dictionary arguments"""
tool = CrewStructuredTool.from_function(func=basic_function, name="test_tool")
@@ -79,7 +78,7 @@ class InternalCrewStructuredTool:
assert parsed["param1"] == "test"
assert parsed["param2"] == 42
def test_parse_args_string(self, basic_function):
def test_parse_args_string(basic_function):
"""Test parsing string arguments"""
tool = CrewStructuredTool.from_function(func=basic_function, name="test_tool")
@@ -87,7 +86,7 @@ class InternalCrewStructuredTool:
assert parsed["param1"] == "test"
assert parsed["param2"] == 42
def test_complex_types(self):
def test_complex_types():
"""Test handling of complex parameter types"""
def complex_func(nested: dict, items: list) -> str:
@@ -100,7 +99,7 @@ class InternalCrewStructuredTool:
result = tool.invoke({"nested": {"key": "value"}, "items": [1, 2, 3]})
assert result == "Processed 3 items with 1 nested keys"
def test_schema_inheritance(self):
def test_schema_inheritance():
"""Test tool creation with inherited schema"""
def extended_func(base_param: str, extra_param: int) -> str:
@@ -120,7 +119,7 @@ class InternalCrewStructuredTool:
result = tool.invoke({"base_param": "test", "extra_param": 42})
assert result == "test 42"
def test_default_values_in_schema(self):
def test_default_values_in_schema():
"""Test handling of default values in schema"""
def default_func(
@@ -144,3 +143,88 @@ class InternalCrewStructuredTool:
{"required_param": "test", "optional_param": "custom", "nullable_param": 42}
)
assert result == "test custom 42"
@pytest.fixture
def custom_tool_decorator():
from crewai.tools import tool
@tool("custom_tool", result_as_answer=True)
async def custom_tool():
"""This is a tool that does something"""
return "Hello World from Custom Tool"
return custom_tool
@pytest.fixture
def custom_tool():
from crewai.tools import BaseTool
class CustomTool(BaseTool):
name: str = "my_tool"
description: str = "This is a tool that does something"
result_as_answer: bool = True
async def _run(self):
return "Hello World from Custom Tool"
return CustomTool()
def build_simple_crew(tool):
from crewai import Agent, Task, Crew
agent1 = Agent(role="Simple role", goal="Simple goal", backstory="Simple backstory", tools=[tool])
say_hi_task = Task(
description="Use the custom tool result as answer.", agent=agent1, expected_output="Use the tool result"
)
crew = Crew(agents=[agent1], tasks=[say_hi_task])
return crew
@pytest.mark.vcr(filter_headers=["authorization"])
def test_async_tool_using_within_isolated_crew(custom_tool):
crew = build_simple_crew(custom_tool)
result = crew.kickoff()
assert result.raw == "Hello World from Custom Tool"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_async_tool_using_decorator_within_isolated_crew(custom_tool_decorator):
crew = build_simple_crew(custom_tool_decorator)
result = crew.kickoff()
assert result.raw == "Hello World from Custom Tool"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_async_tool_within_flow(custom_tool):
from crewai.flow.flow import Flow
class StructuredExampleFlow(Flow):
from crewai.flow.flow import start
@start()
async def start(self):
crew = build_simple_crew(custom_tool)
result = await crew.kickoff_async()
return result
flow = StructuredExampleFlow()
result = flow.kickoff()
assert result.raw == "Hello World from Custom Tool"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_async_tool_using_decorator_within_flow(custom_tool_decorator):
from crewai.flow.flow import Flow
class StructuredExampleFlow(Flow):
from crewai.flow.flow import start
@start()
async def start(self):
crew = build_simple_crew(custom_tool_decorator)
result = await crew.kickoff_async()
return result
flow = StructuredExampleFlow()
result = flow.kickoff()
assert result.raw == "Hello World from Custom Tool"