mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-11 00:58:30 +00:00
Add task decomposition feature (Issue #2717)
This PR implements task decomposition as requested in Issue #2717. It allows complex tasks to be automatically split into sub-tasks without manual intervention. - Added parent_task and sub_tasks fields to Task class - Implemented decompose() method to create sub-tasks - Added combine_sub_task_results() method to aggregate results - Updated execute_sync() to handle sub-task execution - Added execute_sub_tasks_async() for asynchronous execution - Created tests for the task decomposition functionality - Added example script demonstrating usage Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
48
examples/task_decomposition_example.py
Normal file
48
examples/task_decomposition_example.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
"""
|
||||||
|
Example of using task decomposition in CrewAI.
|
||||||
|
|
||||||
|
This example demonstrates how to use the task decomposition feature
|
||||||
|
to break down complex tasks into simpler sub-tasks.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from crewai import Agent, Task, Crew
|
||||||
|
|
||||||
|
researcher = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Research effectively",
|
||||||
|
backstory="You're an expert researcher with skills in breaking down complex topics.",
|
||||||
|
)
|
||||||
|
|
||||||
|
research_task = Task(
|
||||||
|
description="Research the impact of AI on various industries",
|
||||||
|
expected_output="A comprehensive report covering multiple industries",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
sub_tasks = research_task.decompose(
|
||||||
|
descriptions=[
|
||||||
|
"Research AI impact on healthcare industry",
|
||||||
|
"Research AI impact on finance industry",
|
||||||
|
"Research AI impact on education industry",
|
||||||
|
],
|
||||||
|
expected_outputs=[
|
||||||
|
"A report on AI in healthcare",
|
||||||
|
"A report on AI in finance",
|
||||||
|
"A report on AI in education",
|
||||||
|
],
|
||||||
|
names=["Healthcare", "Finance", "Education"],
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(
|
||||||
|
agents=[researcher],
|
||||||
|
tasks=[research_task],
|
||||||
|
)
|
||||||
|
|
||||||
|
result = crew.kickoff()
|
||||||
|
print(result)
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#
|
||||||
@@ -19,6 +19,7 @@ from typing import (
|
|||||||
Tuple,
|
Tuple,
|
||||||
Type,
|
Type,
|
||||||
Union,
|
Union,
|
||||||
|
ForwardRef,
|
||||||
)
|
)
|
||||||
|
|
||||||
from opentelemetry.trace import Span
|
from opentelemetry.trace import Span
|
||||||
@@ -137,6 +138,16 @@ class Task(BaseModel):
|
|||||||
default=0,
|
default=0,
|
||||||
description="Current number of retries"
|
description="Current number of retries"
|
||||||
)
|
)
|
||||||
|
parent_task: Optional['Task'] = Field(
|
||||||
|
default=None,
|
||||||
|
description="Parent task that this task was decomposed from.",
|
||||||
|
exclude=True,
|
||||||
|
)
|
||||||
|
sub_tasks: List['Task'] = Field(
|
||||||
|
default_factory=list,
|
||||||
|
description="Sub-tasks that this task was decomposed into.",
|
||||||
|
exclude=True,
|
||||||
|
)
|
||||||
|
|
||||||
@field_validator("guardrail")
|
@field_validator("guardrail")
|
||||||
@classmethod
|
@classmethod
|
||||||
@@ -246,6 +257,81 @@ class Task(BaseModel):
|
|||||||
)
|
)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def decompose(
|
||||||
|
self,
|
||||||
|
descriptions: List[str],
|
||||||
|
expected_outputs: Optional[List[str]] = None,
|
||||||
|
names: Optional[List[str]] = None
|
||||||
|
) -> List['Task']:
|
||||||
|
"""
|
||||||
|
Decompose a complex task into simpler sub-tasks.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
descriptions: List of descriptions for each sub-task.
|
||||||
|
expected_outputs: Optional list of expected outputs for each sub-task.
|
||||||
|
names: Optional list of names for each sub-task.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of created sub-tasks.
|
||||||
|
"""
|
||||||
|
if not descriptions:
|
||||||
|
raise ValueError("At least one sub-task description is required.")
|
||||||
|
|
||||||
|
if expected_outputs and len(expected_outputs) != len(descriptions):
|
||||||
|
raise ValueError("If provided, expected_outputs must have the same length as descriptions.")
|
||||||
|
|
||||||
|
if names and len(names) != len(descriptions):
|
||||||
|
raise ValueError("If provided, names must have the same length as descriptions.")
|
||||||
|
|
||||||
|
for i, description in enumerate(descriptions):
|
||||||
|
sub_task = Task(
|
||||||
|
description=description,
|
||||||
|
expected_output=expected_outputs[i] if expected_outputs else self.expected_output,
|
||||||
|
name=names[i] if names else None,
|
||||||
|
agent=self.agent, # Inherit the agent from the parent task
|
||||||
|
tools=self.tools, # Inherit the tools from the parent task
|
||||||
|
context=[self], # Set the parent task as context for the sub-task
|
||||||
|
parent_task=self, # Reference back to the parent task
|
||||||
|
)
|
||||||
|
self.sub_tasks.append(sub_task)
|
||||||
|
|
||||||
|
return self.sub_tasks
|
||||||
|
|
||||||
|
def combine_sub_task_results(self) -> str:
|
||||||
|
"""
|
||||||
|
Combine the results from all sub-tasks into a single result for this task.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The combined result as a string.
|
||||||
|
"""
|
||||||
|
if not self.sub_tasks:
|
||||||
|
raise ValueError("Task has no sub-tasks to combine results from.")
|
||||||
|
|
||||||
|
if not self.agent:
|
||||||
|
raise ValueError("Task has no agent to combine sub-task results.")
|
||||||
|
|
||||||
|
sub_task_results = "\n\n".join([
|
||||||
|
f"Sub-task: {sub_task.description}\nResult: {sub_task.output.raw if sub_task.output else 'No result'}"
|
||||||
|
for sub_task in self.sub_tasks
|
||||||
|
])
|
||||||
|
|
||||||
|
combine_prompt = f"""
|
||||||
|
You have completed the following sub-tasks for the main task: "{self.description}"
|
||||||
|
|
||||||
|
{sub_task_results}
|
||||||
|
|
||||||
|
Based on all these sub-tasks, please provide a consolidated final answer for the main task.
|
||||||
|
Expected output format: {self.expected_output if self.expected_output else 'Not specified'}
|
||||||
|
"""
|
||||||
|
|
||||||
|
result = self.agent.execute_task(
|
||||||
|
task=self,
|
||||||
|
context=combine_prompt,
|
||||||
|
tools=self.tools or []
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
def execute_sync(
|
def execute_sync(
|
||||||
self,
|
self,
|
||||||
agent: Optional[BaseAgent] = None,
|
agent: Optional[BaseAgent] = None,
|
||||||
@@ -253,6 +339,17 @@ class Task(BaseModel):
|
|||||||
tools: Optional[List[BaseTool]] = None,
|
tools: Optional[List[BaseTool]] = None,
|
||||||
) -> TaskOutput:
|
) -> TaskOutput:
|
||||||
"""Execute the task synchronously."""
|
"""Execute the task synchronously."""
|
||||||
|
if self.sub_tasks and not self.output:
|
||||||
|
for sub_task in self.sub_tasks:
|
||||||
|
sub_task.execute_sync(
|
||||||
|
agent=sub_task.agent or agent,
|
||||||
|
context=context,
|
||||||
|
tools=sub_task.tools or tools or [],
|
||||||
|
)
|
||||||
|
|
||||||
|
result = self.combine_sub_task_results()
|
||||||
|
return self._execute_core(agent, context, tools)
|
||||||
|
|
||||||
return self._execute_core(agent, context, tools)
|
return self._execute_core(agent, context, tools)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -278,6 +375,36 @@ class Task(BaseModel):
|
|||||||
).start()
|
).start()
|
||||||
return future
|
return future
|
||||||
|
|
||||||
|
def execute_sub_tasks_async(
|
||||||
|
self,
|
||||||
|
agent: Optional[BaseAgent] = None,
|
||||||
|
context: Optional[str] = None,
|
||||||
|
tools: Optional[List[BaseTool]] = None,
|
||||||
|
) -> List[Future[TaskOutput]]:
|
||||||
|
"""Execute all sub-tasks asynchronously.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: Optional agent to execute the sub-tasks with.
|
||||||
|
context: Optional context to pass to the sub-tasks.
|
||||||
|
tools: Optional tools to pass to the sub-tasks.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of futures for the sub-task executions.
|
||||||
|
"""
|
||||||
|
if not self.sub_tasks:
|
||||||
|
return []
|
||||||
|
|
||||||
|
futures = []
|
||||||
|
for sub_task in self.sub_tasks:
|
||||||
|
future = sub_task.execute_async(
|
||||||
|
agent=sub_task.agent or agent,
|
||||||
|
context=context,
|
||||||
|
tools=sub_task.tools or tools or [],
|
||||||
|
)
|
||||||
|
futures.append(future)
|
||||||
|
|
||||||
|
return futures
|
||||||
|
|
||||||
def _execute_task_async(
|
def _execute_task_async(
|
||||||
self,
|
self,
|
||||||
agent: Optional[BaseAgent],
|
agent: Optional[BaseAgent],
|
||||||
@@ -434,6 +561,8 @@ class Task(BaseModel):
|
|||||||
"agent",
|
"agent",
|
||||||
"context",
|
"context",
|
||||||
"tools",
|
"tools",
|
||||||
|
"parent_task",
|
||||||
|
"sub_tasks",
|
||||||
}
|
}
|
||||||
|
|
||||||
copied_data = self.model_dump(exclude=exclude)
|
copied_data = self.model_dump(exclude=exclude)
|
||||||
@@ -457,6 +586,7 @@ class Task(BaseModel):
|
|||||||
agent=cloned_agent,
|
agent=cloned_agent,
|
||||||
tools=cloned_tools,
|
tools=cloned_tools,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
return copied_task
|
return copied_task
|
||||||
|
|
||||||
|
|||||||
157
tests/test_task_decomposition.py
Normal file
157
tests/test_task_decomposition.py
Normal file
@@ -0,0 +1,157 @@
|
|||||||
|
import pytest
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
from crewai import Agent, Task
|
||||||
|
|
||||||
|
|
||||||
|
def test_task_decomposition_structure():
|
||||||
|
"""Test that task decomposition creates the proper parent-child relationship."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Research effectively",
|
||||||
|
backstory="You're an expert researcher",
|
||||||
|
)
|
||||||
|
|
||||||
|
parent_task = Task(
|
||||||
|
description="Research the impact of AI on various industries",
|
||||||
|
expected_output="A comprehensive report",
|
||||||
|
agent=agent,
|
||||||
|
)
|
||||||
|
|
||||||
|
sub_task_descriptions = [
|
||||||
|
"Research AI impact on healthcare",
|
||||||
|
"Research AI impact on finance",
|
||||||
|
"Research AI impact on education",
|
||||||
|
]
|
||||||
|
|
||||||
|
sub_tasks = parent_task.decompose(
|
||||||
|
descriptions=sub_task_descriptions,
|
||||||
|
expected_outputs=["Healthcare report", "Finance report", "Education report"],
|
||||||
|
names=["Healthcare", "Finance", "Education"],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(sub_tasks) == 3
|
||||||
|
assert len(parent_task.sub_tasks) == 3
|
||||||
|
|
||||||
|
for sub_task in sub_tasks:
|
||||||
|
assert sub_task.parent_task == parent_task
|
||||||
|
assert parent_task in sub_task.context
|
||||||
|
|
||||||
|
|
||||||
|
def test_task_execution_with_sub_tasks():
|
||||||
|
"""Test that executing a task with sub-tasks executes the sub-tasks first."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Research effectively",
|
||||||
|
backstory="You're an expert researcher",
|
||||||
|
)
|
||||||
|
|
||||||
|
parent_task = Task(
|
||||||
|
description="Research the impact of AI on various industries",
|
||||||
|
expected_output="A comprehensive report",
|
||||||
|
agent=agent,
|
||||||
|
)
|
||||||
|
|
||||||
|
sub_task_descriptions = [
|
||||||
|
"Research AI impact on healthcare",
|
||||||
|
"Research AI impact on finance",
|
||||||
|
"Research AI impact on education",
|
||||||
|
]
|
||||||
|
|
||||||
|
parent_task.decompose(
|
||||||
|
descriptions=sub_task_descriptions,
|
||||||
|
expected_outputs=["Healthcare report", "Finance report", "Education report"],
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(Agent, 'execute_task', return_value="Mock result") as mock_execute_task:
|
||||||
|
result = parent_task.execute_sync()
|
||||||
|
|
||||||
|
assert mock_execute_task.call_count >= 3
|
||||||
|
|
||||||
|
for sub_task in parent_task.sub_tasks:
|
||||||
|
assert sub_task.output is not None
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert result.raw is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_combine_sub_task_results():
|
||||||
|
"""Test that combining sub-task results works correctly."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Research effectively",
|
||||||
|
backstory="You're an expert researcher",
|
||||||
|
)
|
||||||
|
|
||||||
|
parent_task = Task(
|
||||||
|
description="Research the impact of AI on various industries",
|
||||||
|
expected_output="A comprehensive report",
|
||||||
|
agent=agent,
|
||||||
|
)
|
||||||
|
|
||||||
|
sub_tasks = parent_task.decompose([
|
||||||
|
"Research AI impact on healthcare",
|
||||||
|
"Research AI impact on finance",
|
||||||
|
])
|
||||||
|
|
||||||
|
for sub_task in sub_tasks:
|
||||||
|
sub_task.output = Mock()
|
||||||
|
sub_task.output.raw = f"Result for {sub_task.description}"
|
||||||
|
|
||||||
|
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
|
||||||
|
result = parent_task.combine_sub_task_results()
|
||||||
|
|
||||||
|
assert mock_execute_task.called
|
||||||
|
assert result == "Combined result"
|
||||||
|
|
||||||
|
|
||||||
|
def test_task_decomposition_validation():
|
||||||
|
"""Test that task decomposition validates inputs correctly."""
|
||||||
|
parent_task = Task(
|
||||||
|
description="Research the impact of AI",
|
||||||
|
expected_output="A report",
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="At least one sub-task description is required"):
|
||||||
|
parent_task.decompose([])
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="expected_outputs must have the same length"):
|
||||||
|
parent_task.decompose(
|
||||||
|
["Task 1", "Task 2"],
|
||||||
|
expected_outputs=["Output 1"]
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="names must have the same length"):
|
||||||
|
parent_task.decompose(
|
||||||
|
["Task 1", "Task 2"],
|
||||||
|
names=["Name 1"]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_execute_sub_tasks_async():
|
||||||
|
"""Test that executing sub-tasks asynchronously works correctly."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Research effectively",
|
||||||
|
backstory="You're an expert researcher",
|
||||||
|
)
|
||||||
|
|
||||||
|
parent_task = Task(
|
||||||
|
description="Research the impact of AI on various industries",
|
||||||
|
expected_output="A comprehensive report",
|
||||||
|
agent=agent,
|
||||||
|
)
|
||||||
|
|
||||||
|
sub_tasks = parent_task.decompose([
|
||||||
|
"Research AI impact on healthcare",
|
||||||
|
"Research AI impact on finance",
|
||||||
|
])
|
||||||
|
|
||||||
|
with patch.object(Task, 'execute_async') as mock_execute_async:
|
||||||
|
mock_future = Mock()
|
||||||
|
mock_execute_async.return_value = mock_future
|
||||||
|
|
||||||
|
futures = parent_task.execute_sub_tasks_async()
|
||||||
|
|
||||||
|
assert mock_execute_async.call_count == 2
|
||||||
|
assert len(futures) == 2
|
||||||
Reference in New Issue
Block a user