Compare commits

...

2 Commits

Author SHA1 Message Date
Devin AI
1cf09ac7ce Address PR feedback: Fix ForwardRef issues, improve error messages, enhance docs
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-04-29 13:25:40 +00:00
Devin AI
a36e696a69 Add task decomposition feature (Issue #2717)
This PR implements task decomposition as requested in Issue #2717.
It allows complex tasks to be automatically split into sub-tasks
without manual intervention.

- Added parent_task and sub_tasks fields to Task class
- Implemented decompose() method to create sub-tasks
- Added combine_sub_task_results() method to aggregate results
- Updated execute_sync() to handle sub-task execution
- Added execute_sub_tasks_async() for asynchronous execution
- Created tests for the task decomposition functionality
- Added example script demonstrating usage

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-04-29 13:19:17 +00:00
4 changed files with 518 additions and 1 deletions

View File

@@ -0,0 +1,47 @@
"""
Example of using task decomposition in CrewAI.
This example demonstrates how to use the task decomposition feature
to break down complex tasks into simpler sub-tasks.
Feature introduced in CrewAI v1.x.x
"""
from crewai import Agent, Task, Crew
researcher = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher with skills in breaking down complex topics.",
)
research_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report covering multiple industries",
agent=researcher,
)
sub_tasks = research_task.decompose(
descriptions=[
"Research AI impact on healthcare industry",
"Research AI impact on finance industry",
"Research AI impact on education industry",
],
expected_outputs=[
"A report on AI in healthcare",
"A report on AI in finance",
"A report on AI in education",
],
names=["Healthcare", "Finance", "Education"],
)
crew = Crew(
agents=[researcher],
tasks=[research_task],
)
result = crew.kickoff()
print("Final result:", result)
for i, sub_task in enumerate(research_task.sub_tasks):
print(f"Sub-task {i+1} result: {sub_task.output.raw if hasattr(sub_task, 'output') and sub_task.output else 'No output'}")

View File

@@ -19,6 +19,7 @@ from typing import (
Tuple,
Type,
Union,
ForwardRef,
)
from opentelemetry.trace import Span
@@ -137,6 +138,16 @@ class Task(BaseModel):
default=0,
description="Current number of retries"
)
parent_task: Optional['Task'] = Field(
default=None,
description="Parent task that this task was decomposed from.",
exclude=True,
)
sub_tasks: List['Task'] = Field(
default_factory=list,
description="Sub-tasks that this task was decomposed into.",
exclude=True,
)
@field_validator("guardrail")
@classmethod
@@ -246,13 +257,151 @@ class Task(BaseModel):
)
return self
def decompose(
self,
descriptions: List[str],
expected_outputs: Optional[List[str]] = None,
names: Optional[List[str]] = None
) -> List['Task']:
"""
Decompose a complex task into simpler sub-tasks.
Args:
descriptions: List of descriptions for each sub-task.
expected_outputs: Optional list of expected outputs for each sub-task.
names: Optional list of names for each sub-task.
Returns:
List of created sub-tasks.
Raises:
ValueError: If descriptions is empty, or if expected_outputs or names
have different lengths than descriptions.
Side Effects:
Modifies self.sub_tasks by adding newly created sub-tasks.
"""
if not descriptions:
raise ValueError("At least one sub-task description is required.")
if expected_outputs and len(expected_outputs) != len(descriptions):
raise ValueError(
f"If provided, expected_outputs must have the same length as descriptions. "
f"Got {len(expected_outputs)} expected outputs and {len(descriptions)} descriptions."
)
if names and len(names) != len(descriptions):
raise ValueError(
f"If provided, names must have the same length as descriptions. "
f"Got {len(names)} names and {len(descriptions)} descriptions."
)
for i, description in enumerate(descriptions):
sub_task = Task(
description=description,
expected_output=expected_outputs[i] if expected_outputs else self.expected_output,
name=names[i] if names else None,
agent=self.agent, # Inherit the agent from the parent task
tools=self.tools, # Inherit the tools from the parent task
context=[self], # Set the parent task as context for the sub-task
parent_task=self, # Reference back to the parent task
)
self.sub_tasks.append(sub_task)
return self.sub_tasks
def combine_sub_task_results(self) -> str:
"""
Combine the results from all sub-tasks into a single result for this task.
This method uses the task's agent to intelligently combine the results from
all sub-tasks. It requires an agent capable of coherent text summarization
and is designed for stateless prompt execution.
Returns:
The combined result as a string.
Raises:
ValueError: If the task has no sub-tasks or no agent assigned.
Side Effects:
None. This method does not modify the task's state.
"""
if not self.sub_tasks:
raise ValueError("Task has no sub-tasks to combine results from.")
if not self.agent:
raise ValueError("Task has no agent to combine sub-task results.")
sub_task_results = "\n\n".join([
f"Sub-task: {sub_task.description}\nResult: {sub_task.output.raw if sub_task.output else 'No result'}"
for sub_task in self.sub_tasks
])
combine_prompt = f"""
You have completed the following sub-tasks for the main task: "{self.description}"
{sub_task_results}
Based on all these sub-tasks, please provide a consolidated final answer for the main task.
Expected output format: {self.expected_output if self.expected_output else 'Not specified'}
"""
result = self.agent.execute_task(
task=self,
context=combine_prompt,
tools=self.tools or []
)
return result
def execute_sync(
self,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> TaskOutput:
"""Execute the task synchronously."""
"""
Execute the task synchronously.
If the task has sub-tasks and no output yet, this method will:
1. Execute all sub-tasks first
2. Combine their results using the agent
3. Set the combined result as this task's output
Args:
agent: Optional agent to execute the task with.
context: Optional context to pass to the task.
tools: Optional tools to pass to the task.
Returns:
TaskOutput: The result of the task execution.
Side Effects:
Sets self.output with the execution result.
"""
if self.sub_tasks and not self.output:
for sub_task in self.sub_tasks:
sub_task.execute_sync(
agent=sub_task.agent or agent,
context=context,
tools=sub_task.tools or tools or [],
)
# Combine the results from sub-tasks
result = self.combine_sub_task_results()
self.output = TaskOutput(
description=self.description,
name=self.name,
expected_output=self.expected_output,
raw=result,
agent=self.agent.role if self.agent else None,
output_format=self.output_format,
)
return self.output
return self._execute_core(agent, context, tools)
@property
@@ -278,6 +427,55 @@ class Task(BaseModel):
).start()
return future
def execute_sub_tasks_async(
self,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> List[Future[TaskOutput]]:
"""
Execute all sub-tasks asynchronously.
This method starts the execution of all sub-tasks in parallel and returns
futures that can be awaited. After all futures are complete, you should call
combine_sub_task_results() to aggregate the results.
Example:
```python
futures = task.execute_sub_tasks_async()
for future in futures:
future.result()
# Combine the results
result = task.combine_sub_task_results()
```
Args:
agent: Optional agent to execute the sub-tasks with.
context: Optional context to pass to the sub-tasks.
tools: Optional tools to pass to the sub-tasks.
Returns:
List of futures for the sub-task executions.
Raises:
ValueError: If the task has no sub-tasks.
"""
if not self.sub_tasks:
return []
futures = []
for sub_task in self.sub_tasks:
future = sub_task.execute_async(
agent=sub_task.agent or agent,
context=context,
tools=sub_task.tools or tools or [],
)
futures.append(future)
return futures
def _execute_task_async(
self,
agent: Optional[BaseAgent],
@@ -434,6 +632,8 @@ class Task(BaseModel):
"agent",
"context",
"tools",
"parent_task",
"sub_tasks",
}
copied_data = self.model_dump(exclude=exclude)
@@ -457,6 +657,7 @@ class Task(BaseModel):
agent=cloned_agent,
tools=cloned_tools,
)
return copied_task
@@ -526,3 +727,6 @@ class Task(BaseModel):
def __repr__(self):
return f"Task(description={self.description}, expected_output={self.expected_output})"
Task.model_rebuild()

View File

@@ -0,0 +1,157 @@
import pytest
from unittest.mock import Mock, patch
from crewai import Agent, Task
def test_task_decomposition_structure():
"""Test that task decomposition creates the proper parent-child relationship."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_task_descriptions = [
"Research AI impact on healthcare",
"Research AI impact on finance",
"Research AI impact on education",
]
sub_tasks = parent_task.decompose(
descriptions=sub_task_descriptions,
expected_outputs=["Healthcare report", "Finance report", "Education report"],
names=["Healthcare", "Finance", "Education"],
)
assert len(sub_tasks) == 3
assert len(parent_task.sub_tasks) == 3
for sub_task in sub_tasks:
assert sub_task.parent_task == parent_task
assert parent_task in sub_task.context
def test_task_execution_with_sub_tasks():
"""Test that executing a task with sub-tasks executes the sub-tasks first."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_task_descriptions = [
"Research AI impact on healthcare",
"Research AI impact on finance",
"Research AI impact on education",
]
parent_task.decompose(
descriptions=sub_task_descriptions,
expected_outputs=["Healthcare report", "Finance report", "Education report"],
)
with patch.object(Agent, 'execute_task', return_value="Mock result") as mock_execute_task:
result = parent_task.execute_sync()
assert mock_execute_task.call_count >= 3
for sub_task in parent_task.sub_tasks:
assert sub_task.output is not None
assert result is not None
assert result.raw is not None
def test_combine_sub_task_results():
"""Test that combining sub-task results works correctly."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_tasks = parent_task.decompose([
"Research AI impact on healthcare",
"Research AI impact on finance",
])
for sub_task in sub_tasks:
sub_task.output = Mock()
sub_task.output.raw = f"Result for {sub_task.description}"
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
result = parent_task.combine_sub_task_results()
assert mock_execute_task.called
assert result == "Combined result"
def test_task_decomposition_validation():
"""Test that task decomposition validates inputs correctly."""
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
)
with pytest.raises(ValueError, match="At least one sub-task description is required"):
parent_task.decompose([])
with pytest.raises(ValueError, match="expected_outputs must have the same length"):
parent_task.decompose(
["Task 1", "Task 2"],
expected_outputs=["Output 1"]
)
with pytest.raises(ValueError, match="names must have the same length"):
parent_task.decompose(
["Task 1", "Task 2"],
names=["Name 1"]
)
def test_execute_sub_tasks_async():
"""Test that executing sub-tasks asynchronously works correctly."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_tasks = parent_task.decompose([
"Research AI impact on healthcare",
"Research AI impact on finance",
])
with patch.object(Task, 'execute_async') as mock_execute_async:
mock_future = Mock()
mock_execute_async.return_value = mock_future
futures = parent_task.execute_sub_tasks_async()
assert mock_execute_async.call_count == 2
assert len(futures) == 2

View File

@@ -0,0 +1,109 @@
import pytest
from unittest.mock import Mock, patch
from crewai import Agent, Task, TaskOutput
def test_combine_sub_task_results_no_sub_tasks():
"""Test that combining sub-task results raises an error when there are no sub-tasks."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
with pytest.raises(ValueError, match="Task has no sub-tasks to combine results from"):
parent_task.combine_sub_task_results()
def test_combine_sub_task_results_no_agent():
"""Test that combining sub-task results raises an error when there is no agent."""
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
)
sub_task = Task(
description="Research AI impact on healthcare",
expected_output="Healthcare report",
parent_task=parent_task,
)
parent_task.sub_tasks.append(sub_task)
with pytest.raises(ValueError, match="Task has no agent to combine sub-task results"):
parent_task.combine_sub_task_results()
def test_execute_sync_sets_output_after_combining():
"""Test that execute_sync sets the output after combining sub-task results."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
sub_tasks = parent_task.decompose([
"Research AI impact on healthcare",
"Research AI impact on finance",
])
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
result = parent_task.execute_sync()
assert parent_task.output is not None
assert parent_task.output.raw == "Combined result"
assert result.raw == "Combined result"
assert mock_execute_task.call_count >= 3
def test_deep_cloning_prevents_shared_state():
"""Test that deep cloning prevents shared mutable state between tasks."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
copied_task = parent_task.copy()
copied_task.description = "Modified description"
assert parent_task.description == "Research the impact of AI"
assert copied_task.description == "Modified description"
parent_task.decompose(["Sub-task 1", "Sub-task 2"])
assert len(parent_task.sub_tasks) == 2
assert len(copied_task.sub_tasks) == 0
def test_execute_sub_tasks_async_empty_sub_tasks():
"""Test that execute_sub_tasks_async returns an empty list when there are no sub-tasks."""
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
)
futures = parent_task.execute_sub_tasks_async()
assert isinstance(futures, list)
assert len(futures) == 0