Address PR feedback: Fix ForwardRef issues, improve error messages, enhance docs

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-04-29 13:25:40 +00:00
parent a36e696a69
commit 1cf09ac7ce
3 changed files with 193 additions and 11 deletions

View File

@@ -3,6 +3,8 @@ Example of using task decomposition in CrewAI.
This example demonstrates how to use the task decomposition feature This example demonstrates how to use the task decomposition feature
to break down complex tasks into simpler sub-tasks. to break down complex tasks into simpler sub-tasks.
Feature introduced in CrewAI v1.x.x
""" """
from crewai import Agent, Task, Crew from crewai import Agent, Task, Crew
@@ -39,10 +41,7 @@ crew = Crew(
) )
result = crew.kickoff() result = crew.kickoff()
print(result) print("Final result:", result)
for i, sub_task in enumerate(research_task.sub_tasks):
# print(f"Sub-task {i+1} result: {sub_task.output.raw if hasattr(sub_task, 'output') and sub_task.output else 'No output'}")
#
#
#

View File

@@ -273,15 +273,28 @@ class Task(BaseModel):
Returns: Returns:
List of created sub-tasks. List of created sub-tasks.
Raises:
ValueError: If descriptions is empty, or if expected_outputs or names
have different lengths than descriptions.
Side Effects:
Modifies self.sub_tasks by adding newly created sub-tasks.
""" """
if not descriptions: if not descriptions:
raise ValueError("At least one sub-task description is required.") raise ValueError("At least one sub-task description is required.")
if expected_outputs and len(expected_outputs) != len(descriptions): if expected_outputs and len(expected_outputs) != len(descriptions):
raise ValueError("If provided, expected_outputs must have the same length as descriptions.") raise ValueError(
f"If provided, expected_outputs must have the same length as descriptions. "
f"Got {len(expected_outputs)} expected outputs and {len(descriptions)} descriptions."
)
if names and len(names) != len(descriptions): if names and len(names) != len(descriptions):
raise ValueError("If provided, names must have the same length as descriptions.") raise ValueError(
f"If provided, names must have the same length as descriptions. "
f"Got {len(names)} names and {len(descriptions)} descriptions."
)
for i, description in enumerate(descriptions): for i, description in enumerate(descriptions):
sub_task = Task( sub_task = Task(
@@ -301,8 +314,18 @@ class Task(BaseModel):
""" """
Combine the results from all sub-tasks into a single result for this task. Combine the results from all sub-tasks into a single result for this task.
This method uses the task's agent to intelligently combine the results from
all sub-tasks. It requires an agent capable of coherent text summarization
and is designed for stateless prompt execution.
Returns: Returns:
The combined result as a string. The combined result as a string.
Raises:
ValueError: If the task has no sub-tasks or no agent assigned.
Side Effects:
None. This method does not modify the task's state.
""" """
if not self.sub_tasks: if not self.sub_tasks:
raise ValueError("Task has no sub-tasks to combine results from.") raise ValueError("Task has no sub-tasks to combine results from.")
@@ -338,7 +361,25 @@ class Task(BaseModel):
context: Optional[str] = None, context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None, tools: Optional[List[BaseTool]] = None,
) -> TaskOutput: ) -> TaskOutput:
"""Execute the task synchronously.""" """
Execute the task synchronously.
If the task has sub-tasks and no output yet, this method will:
1. Execute all sub-tasks first
2. Combine their results using the agent
3. Set the combined result as this task's output
Args:
agent: Optional agent to execute the task with.
context: Optional context to pass to the task.
tools: Optional tools to pass to the task.
Returns:
TaskOutput: The result of the task execution.
Side Effects:
Sets self.output with the execution result.
"""
if self.sub_tasks and not self.output: if self.sub_tasks and not self.output:
for sub_task in self.sub_tasks: for sub_task in self.sub_tasks:
sub_task.execute_sync( sub_task.execute_sync(
@@ -347,8 +388,19 @@ class Task(BaseModel):
tools=sub_task.tools or tools or [], tools=sub_task.tools or tools or [],
) )
# Combine the results from sub-tasks
result = self.combine_sub_task_results() result = self.combine_sub_task_results()
return self._execute_core(agent, context, tools)
self.output = TaskOutput(
description=self.description,
name=self.name,
expected_output=self.expected_output,
raw=result,
agent=self.agent.role if self.agent else None,
output_format=self.output_format,
)
return self.output
return self._execute_core(agent, context, tools) return self._execute_core(agent, context, tools)
@@ -381,7 +433,23 @@ class Task(BaseModel):
context: Optional[str] = None, context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None, tools: Optional[List[BaseTool]] = None,
) -> List[Future[TaskOutput]]: ) -> List[Future[TaskOutput]]:
"""Execute all sub-tasks asynchronously. """
Execute all sub-tasks asynchronously.
This method starts the execution of all sub-tasks in parallel and returns
futures that can be awaited. After all futures are complete, you should call
combine_sub_task_results() to aggregate the results.
Example:
```python
futures = task.execute_sub_tasks_async()
for future in futures:
future.result()
# Combine the results
result = task.combine_sub_task_results()
```
Args: Args:
agent: Optional agent to execute the sub-tasks with. agent: Optional agent to execute the sub-tasks with.
@@ -390,6 +458,9 @@ class Task(BaseModel):
Returns: Returns:
List of futures for the sub-task executions. List of futures for the sub-task executions.
Raises:
ValueError: If the task has no sub-tasks.
""" """
if not self.sub_tasks: if not self.sub_tasks:
return [] return []
@@ -656,3 +727,6 @@ class Task(BaseModel):
def __repr__(self): def __repr__(self):
return f"Task(description={self.description}, expected_output={self.expected_output})" return f"Task(description={self.description}, expected_output={self.expected_output})"
Task.model_rebuild()

View File

@@ -0,0 +1,109 @@
import pytest
from unittest.mock import Mock, patch
from crewai import Agent, Task, TaskOutput
def test_combine_sub_task_results_no_sub_tasks():
"""Test that combining sub-task results raises an error when there are no sub-tasks."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
with pytest.raises(ValueError, match="Task has no sub-tasks to combine results from"):
parent_task.combine_sub_task_results()
def test_combine_sub_task_results_no_agent():
"""Test that combining sub-task results raises an error when there is no agent."""
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
)
sub_task = Task(
description="Research AI impact on healthcare",
expected_output="Healthcare report",
parent_task=parent_task,
)
parent_task.sub_tasks.append(sub_task)
with pytest.raises(ValueError, match="Task has no agent to combine sub-task results"):
parent_task.combine_sub_task_results()
def test_execute_sync_sets_output_after_combining():
"""Test that execute_sync sets the output after combining sub-task results."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
sub_tasks = parent_task.decompose([
"Research AI impact on healthcare",
"Research AI impact on finance",
])
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
result = parent_task.execute_sync()
assert parent_task.output is not None
assert parent_task.output.raw == "Combined result"
assert result.raw == "Combined result"
assert mock_execute_task.call_count >= 3
def test_deep_cloning_prevents_shared_state():
"""Test that deep cloning prevents shared mutable state between tasks."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
copied_task = parent_task.copy()
copied_task.description = "Modified description"
assert parent_task.description == "Research the impact of AI"
assert copied_task.description == "Modified description"
parent_task.decompose(["Sub-task 1", "Sub-task 2"])
assert len(parent_task.sub_tasks) == 2
assert len(copied_task.sub_tasks) == 0
def test_execute_sub_tasks_async_empty_sub_tasks():
"""Test that execute_sub_tasks_async returns an empty list when there are no sub-tasks."""
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
)
futures = parent_task.execute_sub_tasks_async()
assert isinstance(futures, list)
assert len(futures) == 0