mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-24 16:28:29 +00:00
Compare commits
2 Commits
bugfix-pyt
...
devin/1745
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1cf09ac7ce | ||
|
|
a36e696a69 |
47
examples/task_decomposition_example.py
Normal file
47
examples/task_decomposition_example.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""
|
||||
Example of using task decomposition in CrewAI.
|
||||
|
||||
This example demonstrates how to use the task decomposition feature
|
||||
to break down complex tasks into simpler sub-tasks.
|
||||
|
||||
Feature introduced in CrewAI v1.x.x
|
||||
"""
|
||||
|
||||
from crewai import Agent, Task, Crew
|
||||
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher with skills in breaking down complex topics.",
|
||||
)
|
||||
|
||||
research_task = Task(
|
||||
description="Research the impact of AI on various industries",
|
||||
expected_output="A comprehensive report covering multiple industries",
|
||||
agent=researcher,
|
||||
)
|
||||
|
||||
sub_tasks = research_task.decompose(
|
||||
descriptions=[
|
||||
"Research AI impact on healthcare industry",
|
||||
"Research AI impact on finance industry",
|
||||
"Research AI impact on education industry",
|
||||
],
|
||||
expected_outputs=[
|
||||
"A report on AI in healthcare",
|
||||
"A report on AI in finance",
|
||||
"A report on AI in education",
|
||||
],
|
||||
names=["Healthcare", "Finance", "Education"],
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[researcher],
|
||||
tasks=[research_task],
|
||||
)
|
||||
|
||||
result = crew.kickoff()
|
||||
print("Final result:", result)
|
||||
|
||||
for i, sub_task in enumerate(research_task.sub_tasks):
|
||||
print(f"Sub-task {i+1} result: {sub_task.output.raw if hasattr(sub_task, 'output') and sub_task.output else 'No output'}")
|
||||
@@ -19,6 +19,7 @@ from typing import (
|
||||
Tuple,
|
||||
Type,
|
||||
Union,
|
||||
ForwardRef,
|
||||
)
|
||||
|
||||
from opentelemetry.trace import Span
|
||||
@@ -137,6 +138,16 @@ class Task(BaseModel):
|
||||
default=0,
|
||||
description="Current number of retries"
|
||||
)
|
||||
parent_task: Optional['Task'] = Field(
|
||||
default=None,
|
||||
description="Parent task that this task was decomposed from.",
|
||||
exclude=True,
|
||||
)
|
||||
sub_tasks: List['Task'] = Field(
|
||||
default_factory=list,
|
||||
description="Sub-tasks that this task was decomposed into.",
|
||||
exclude=True,
|
||||
)
|
||||
|
||||
@field_validator("guardrail")
|
||||
@classmethod
|
||||
@@ -246,13 +257,151 @@ class Task(BaseModel):
|
||||
)
|
||||
return self
|
||||
|
||||
def decompose(
|
||||
self,
|
||||
descriptions: List[str],
|
||||
expected_outputs: Optional[List[str]] = None,
|
||||
names: Optional[List[str]] = None
|
||||
) -> List['Task']:
|
||||
"""
|
||||
Decompose a complex task into simpler sub-tasks.
|
||||
|
||||
Args:
|
||||
descriptions: List of descriptions for each sub-task.
|
||||
expected_outputs: Optional list of expected outputs for each sub-task.
|
||||
names: Optional list of names for each sub-task.
|
||||
|
||||
Returns:
|
||||
List of created sub-tasks.
|
||||
|
||||
Raises:
|
||||
ValueError: If descriptions is empty, or if expected_outputs or names
|
||||
have different lengths than descriptions.
|
||||
|
||||
Side Effects:
|
||||
Modifies self.sub_tasks by adding newly created sub-tasks.
|
||||
"""
|
||||
if not descriptions:
|
||||
raise ValueError("At least one sub-task description is required.")
|
||||
|
||||
if expected_outputs and len(expected_outputs) != len(descriptions):
|
||||
raise ValueError(
|
||||
f"If provided, expected_outputs must have the same length as descriptions. "
|
||||
f"Got {len(expected_outputs)} expected outputs and {len(descriptions)} descriptions."
|
||||
)
|
||||
|
||||
if names and len(names) != len(descriptions):
|
||||
raise ValueError(
|
||||
f"If provided, names must have the same length as descriptions. "
|
||||
f"Got {len(names)} names and {len(descriptions)} descriptions."
|
||||
)
|
||||
|
||||
for i, description in enumerate(descriptions):
|
||||
sub_task = Task(
|
||||
description=description,
|
||||
expected_output=expected_outputs[i] if expected_outputs else self.expected_output,
|
||||
name=names[i] if names else None,
|
||||
agent=self.agent, # Inherit the agent from the parent task
|
||||
tools=self.tools, # Inherit the tools from the parent task
|
||||
context=[self], # Set the parent task as context for the sub-task
|
||||
parent_task=self, # Reference back to the parent task
|
||||
)
|
||||
self.sub_tasks.append(sub_task)
|
||||
|
||||
return self.sub_tasks
|
||||
|
||||
def combine_sub_task_results(self) -> str:
|
||||
"""
|
||||
Combine the results from all sub-tasks into a single result for this task.
|
||||
|
||||
This method uses the task's agent to intelligently combine the results from
|
||||
all sub-tasks. It requires an agent capable of coherent text summarization
|
||||
and is designed for stateless prompt execution.
|
||||
|
||||
Returns:
|
||||
The combined result as a string.
|
||||
|
||||
Raises:
|
||||
ValueError: If the task has no sub-tasks or no agent assigned.
|
||||
|
||||
Side Effects:
|
||||
None. This method does not modify the task's state.
|
||||
"""
|
||||
if not self.sub_tasks:
|
||||
raise ValueError("Task has no sub-tasks to combine results from.")
|
||||
|
||||
if not self.agent:
|
||||
raise ValueError("Task has no agent to combine sub-task results.")
|
||||
|
||||
sub_task_results = "\n\n".join([
|
||||
f"Sub-task: {sub_task.description}\nResult: {sub_task.output.raw if sub_task.output else 'No result'}"
|
||||
for sub_task in self.sub_tasks
|
||||
])
|
||||
|
||||
combine_prompt = f"""
|
||||
You have completed the following sub-tasks for the main task: "{self.description}"
|
||||
|
||||
{sub_task_results}
|
||||
|
||||
Based on all these sub-tasks, please provide a consolidated final answer for the main task.
|
||||
Expected output format: {self.expected_output if self.expected_output else 'Not specified'}
|
||||
"""
|
||||
|
||||
result = self.agent.execute_task(
|
||||
task=self,
|
||||
context=combine_prompt,
|
||||
tools=self.tools or []
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def execute_sync(
|
||||
self,
|
||||
agent: Optional[BaseAgent] = None,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
) -> TaskOutput:
|
||||
"""Execute the task synchronously."""
|
||||
"""
|
||||
Execute the task synchronously.
|
||||
|
||||
If the task has sub-tasks and no output yet, this method will:
|
||||
1. Execute all sub-tasks first
|
||||
2. Combine their results using the agent
|
||||
3. Set the combined result as this task's output
|
||||
|
||||
Args:
|
||||
agent: Optional agent to execute the task with.
|
||||
context: Optional context to pass to the task.
|
||||
tools: Optional tools to pass to the task.
|
||||
|
||||
Returns:
|
||||
TaskOutput: The result of the task execution.
|
||||
|
||||
Side Effects:
|
||||
Sets self.output with the execution result.
|
||||
"""
|
||||
if self.sub_tasks and not self.output:
|
||||
for sub_task in self.sub_tasks:
|
||||
sub_task.execute_sync(
|
||||
agent=sub_task.agent or agent,
|
||||
context=context,
|
||||
tools=sub_task.tools or tools or [],
|
||||
)
|
||||
|
||||
# Combine the results from sub-tasks
|
||||
result = self.combine_sub_task_results()
|
||||
|
||||
self.output = TaskOutput(
|
||||
description=self.description,
|
||||
name=self.name,
|
||||
expected_output=self.expected_output,
|
||||
raw=result,
|
||||
agent=self.agent.role if self.agent else None,
|
||||
output_format=self.output_format,
|
||||
)
|
||||
|
||||
return self.output
|
||||
|
||||
return self._execute_core(agent, context, tools)
|
||||
|
||||
@property
|
||||
@@ -278,6 +427,55 @@ class Task(BaseModel):
|
||||
).start()
|
||||
return future
|
||||
|
||||
def execute_sub_tasks_async(
|
||||
self,
|
||||
agent: Optional[BaseAgent] = None,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
) -> List[Future[TaskOutput]]:
|
||||
"""
|
||||
Execute all sub-tasks asynchronously.
|
||||
|
||||
This method starts the execution of all sub-tasks in parallel and returns
|
||||
futures that can be awaited. After all futures are complete, you should call
|
||||
combine_sub_task_results() to aggregate the results.
|
||||
|
||||
Example:
|
||||
```python
|
||||
futures = task.execute_sub_tasks_async()
|
||||
|
||||
for future in futures:
|
||||
future.result()
|
||||
|
||||
# Combine the results
|
||||
result = task.combine_sub_task_results()
|
||||
```
|
||||
|
||||
Args:
|
||||
agent: Optional agent to execute the sub-tasks with.
|
||||
context: Optional context to pass to the sub-tasks.
|
||||
tools: Optional tools to pass to the sub-tasks.
|
||||
|
||||
Returns:
|
||||
List of futures for the sub-task executions.
|
||||
|
||||
Raises:
|
||||
ValueError: If the task has no sub-tasks.
|
||||
"""
|
||||
if not self.sub_tasks:
|
||||
return []
|
||||
|
||||
futures = []
|
||||
for sub_task in self.sub_tasks:
|
||||
future = sub_task.execute_async(
|
||||
agent=sub_task.agent or agent,
|
||||
context=context,
|
||||
tools=sub_task.tools or tools or [],
|
||||
)
|
||||
futures.append(future)
|
||||
|
||||
return futures
|
||||
|
||||
def _execute_task_async(
|
||||
self,
|
||||
agent: Optional[BaseAgent],
|
||||
@@ -434,6 +632,8 @@ class Task(BaseModel):
|
||||
"agent",
|
||||
"context",
|
||||
"tools",
|
||||
"parent_task",
|
||||
"sub_tasks",
|
||||
}
|
||||
|
||||
copied_data = self.model_dump(exclude=exclude)
|
||||
@@ -457,6 +657,7 @@ class Task(BaseModel):
|
||||
agent=cloned_agent,
|
||||
tools=cloned_tools,
|
||||
)
|
||||
|
||||
|
||||
return copied_task
|
||||
|
||||
@@ -526,3 +727,6 @@ class Task(BaseModel):
|
||||
|
||||
def __repr__(self):
|
||||
return f"Task(description={self.description}, expected_output={self.expected_output})"
|
||||
|
||||
|
||||
Task.model_rebuild()
|
||||
|
||||
157
tests/test_task_decomposition.py
Normal file
157
tests/test_task_decomposition.py
Normal file
@@ -0,0 +1,157 @@
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from crewai import Agent, Task
|
||||
|
||||
|
||||
def test_task_decomposition_structure():
|
||||
"""Test that task decomposition creates the proper parent-child relationship."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI on various industries",
|
||||
expected_output="A comprehensive report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
sub_task_descriptions = [
|
||||
"Research AI impact on healthcare",
|
||||
"Research AI impact on finance",
|
||||
"Research AI impact on education",
|
||||
]
|
||||
|
||||
sub_tasks = parent_task.decompose(
|
||||
descriptions=sub_task_descriptions,
|
||||
expected_outputs=["Healthcare report", "Finance report", "Education report"],
|
||||
names=["Healthcare", "Finance", "Education"],
|
||||
)
|
||||
|
||||
assert len(sub_tasks) == 3
|
||||
assert len(parent_task.sub_tasks) == 3
|
||||
|
||||
for sub_task in sub_tasks:
|
||||
assert sub_task.parent_task == parent_task
|
||||
assert parent_task in sub_task.context
|
||||
|
||||
|
||||
def test_task_execution_with_sub_tasks():
|
||||
"""Test that executing a task with sub-tasks executes the sub-tasks first."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI on various industries",
|
||||
expected_output="A comprehensive report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
sub_task_descriptions = [
|
||||
"Research AI impact on healthcare",
|
||||
"Research AI impact on finance",
|
||||
"Research AI impact on education",
|
||||
]
|
||||
|
||||
parent_task.decompose(
|
||||
descriptions=sub_task_descriptions,
|
||||
expected_outputs=["Healthcare report", "Finance report", "Education report"],
|
||||
)
|
||||
|
||||
with patch.object(Agent, 'execute_task', return_value="Mock result") as mock_execute_task:
|
||||
result = parent_task.execute_sync()
|
||||
|
||||
assert mock_execute_task.call_count >= 3
|
||||
|
||||
for sub_task in parent_task.sub_tasks:
|
||||
assert sub_task.output is not None
|
||||
|
||||
assert result is not None
|
||||
assert result.raw is not None
|
||||
|
||||
|
||||
def test_combine_sub_task_results():
|
||||
"""Test that combining sub-task results works correctly."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI on various industries",
|
||||
expected_output="A comprehensive report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
sub_tasks = parent_task.decompose([
|
||||
"Research AI impact on healthcare",
|
||||
"Research AI impact on finance",
|
||||
])
|
||||
|
||||
for sub_task in sub_tasks:
|
||||
sub_task.output = Mock()
|
||||
sub_task.output.raw = f"Result for {sub_task.description}"
|
||||
|
||||
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
|
||||
result = parent_task.combine_sub_task_results()
|
||||
|
||||
assert mock_execute_task.called
|
||||
assert result == "Combined result"
|
||||
|
||||
|
||||
def test_task_decomposition_validation():
|
||||
"""Test that task decomposition validates inputs correctly."""
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="At least one sub-task description is required"):
|
||||
parent_task.decompose([])
|
||||
|
||||
with pytest.raises(ValueError, match="expected_outputs must have the same length"):
|
||||
parent_task.decompose(
|
||||
["Task 1", "Task 2"],
|
||||
expected_outputs=["Output 1"]
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="names must have the same length"):
|
||||
parent_task.decompose(
|
||||
["Task 1", "Task 2"],
|
||||
names=["Name 1"]
|
||||
)
|
||||
|
||||
|
||||
def test_execute_sub_tasks_async():
|
||||
"""Test that executing sub-tasks asynchronously works correctly."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI on various industries",
|
||||
expected_output="A comprehensive report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
sub_tasks = parent_task.decompose([
|
||||
"Research AI impact on healthcare",
|
||||
"Research AI impact on finance",
|
||||
])
|
||||
|
||||
with patch.object(Task, 'execute_async') as mock_execute_async:
|
||||
mock_future = Mock()
|
||||
mock_execute_async.return_value = mock_future
|
||||
|
||||
futures = parent_task.execute_sub_tasks_async()
|
||||
|
||||
assert mock_execute_async.call_count == 2
|
||||
assert len(futures) == 2
|
||||
109
tests/test_task_decomposition_edge_cases.py
Normal file
109
tests/test_task_decomposition_edge_cases.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from crewai import Agent, Task, TaskOutput
|
||||
|
||||
|
||||
def test_combine_sub_task_results_no_sub_tasks():
|
||||
"""Test that combining sub-task results raises an error when there are no sub-tasks."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="Task has no sub-tasks to combine results from"):
|
||||
parent_task.combine_sub_task_results()
|
||||
|
||||
|
||||
def test_combine_sub_task_results_no_agent():
|
||||
"""Test that combining sub-task results raises an error when there is no agent."""
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
)
|
||||
|
||||
sub_task = Task(
|
||||
description="Research AI impact on healthcare",
|
||||
expected_output="Healthcare report",
|
||||
parent_task=parent_task,
|
||||
)
|
||||
parent_task.sub_tasks.append(sub_task)
|
||||
|
||||
with pytest.raises(ValueError, match="Task has no agent to combine sub-task results"):
|
||||
parent_task.combine_sub_task_results()
|
||||
|
||||
|
||||
def test_execute_sync_sets_output_after_combining():
|
||||
"""Test that execute_sync sets the output after combining sub-task results."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
sub_tasks = parent_task.decompose([
|
||||
"Research AI impact on healthcare",
|
||||
"Research AI impact on finance",
|
||||
])
|
||||
|
||||
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
|
||||
result = parent_task.execute_sync()
|
||||
|
||||
assert parent_task.output is not None
|
||||
assert parent_task.output.raw == "Combined result"
|
||||
assert result.raw == "Combined result"
|
||||
|
||||
assert mock_execute_task.call_count >= 3
|
||||
|
||||
|
||||
def test_deep_cloning_prevents_shared_state():
|
||||
"""Test that deep cloning prevents shared mutable state between tasks."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
copied_task = parent_task.copy()
|
||||
|
||||
copied_task.description = "Modified description"
|
||||
|
||||
assert parent_task.description == "Research the impact of AI"
|
||||
assert copied_task.description == "Modified description"
|
||||
|
||||
parent_task.decompose(["Sub-task 1", "Sub-task 2"])
|
||||
|
||||
assert len(parent_task.sub_tasks) == 2
|
||||
assert len(copied_task.sub_tasks) == 0
|
||||
|
||||
|
||||
def test_execute_sub_tasks_async_empty_sub_tasks():
|
||||
"""Test that execute_sub_tasks_async returns an empty list when there are no sub-tasks."""
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
)
|
||||
|
||||
futures = parent_task.execute_sub_tasks_async()
|
||||
|
||||
assert isinstance(futures, list)
|
||||
assert len(futures) == 0
|
||||
Reference in New Issue
Block a user