Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
1cf09ac7ce Address PR feedback: Fix ForwardRef issues, improve error messages, enhance docs
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-04-29 13:25:40 +00:00
Devin AI
a36e696a69 Add task decomposition feature (Issue #2717)
This PR implements task decomposition as requested in Issue #2717.
It allows complex tasks to be automatically split into sub-tasks
without manual intervention.

- Added parent_task and sub_tasks fields to Task class
- Implemented decompose() method to create sub-tasks
- Added combine_sub_task_results() method to aggregate results
- Updated execute_sync() to handle sub-task execution
- Added execute_sub_tasks_async() for asynchronous execution
- Created tests for the task decomposition functionality
- Added example script demonstrating usage

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-04-29 13:19:17 +00:00
6 changed files with 519 additions and 402 deletions

View File

@@ -0,0 +1,47 @@
"""
Example of using task decomposition in CrewAI.
This example demonstrates how to use the task decomposition feature
to break down complex tasks into simpler sub-tasks.
Feature introduced in CrewAI v1.x.x
"""
from crewai import Agent, Task, Crew
researcher = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher with skills in breaking down complex topics.",
)
research_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report covering multiple industries",
agent=researcher,
)
sub_tasks = research_task.decompose(
descriptions=[
"Research AI impact on healthcare industry",
"Research AI impact on finance industry",
"Research AI impact on education industry",
],
expected_outputs=[
"A report on AI in healthcare",
"A report on AI in finance",
"A report on AI in education",
],
names=["Healthcare", "Finance", "Education"],
)
crew = Crew(
agents=[researcher],
tasks=[research_task],
)
result = crew.kickoff()
print("Final result:", result)
for i, sub_task in enumerate(research_task.sub_tasks):
print(f"Sub-task {i+1} result: {sub_task.output.raw if hasattr(sub_task, 'output') and sub_task.output else 'No output'}")

View File

@@ -771,65 +771,6 @@ class Crew(BaseModel):
return self._create_crew_output(task_outputs)
def _get_context_based_output(
self,
task: ConditionalTask,
task_outputs: List[TaskOutput],
task_index: int,
) -> Optional[TaskOutput]:
"""Get the output from explicit context tasks."""
context_task_outputs = []
for context_task in task.context:
context_task_index = self._find_task_index(context_task)
if context_task_index != -1 and context_task_index < task_index:
for output in task_outputs:
if output.description == context_task.description:
context_task_outputs.append(output)
break
return context_task_outputs[-1] if context_task_outputs else None
def _get_non_conditional_output(
self,
task_outputs: List[TaskOutput],
task_index: int,
) -> Optional[TaskOutput]:
"""Get the output from the most recent non-conditional task."""
non_conditional_outputs = []
for i in range(task_index):
if i < len(self.tasks) and not isinstance(self.tasks[i], ConditionalTask):
for output in task_outputs:
if output.description == self.tasks[i].description:
non_conditional_outputs.append(output)
break
return non_conditional_outputs[-1] if non_conditional_outputs else None
def _get_previous_output(
self,
task: ConditionalTask,
task_outputs: List[TaskOutput],
task_index: int,
) -> Optional[TaskOutput]:
"""Get the previous output for a conditional task.
The order of precedence is:
1. Output from explicit context tasks
2. Output from the most recent non-conditional task
3. Output from the immediately preceding task
"""
if task.context and len(task.context) > 0:
previous_output = self._get_context_based_output(task, task_outputs, task_index)
if previous_output:
return previous_output
previous_output = self._get_non_conditional_output(task_outputs, task_index)
if previous_output:
return previous_output
if task_outputs and task_index > 0 and task_index <= len(task_outputs):
return task_outputs[task_index - 1]
return None
def _handle_conditional_task(
self,
task: ConditionalTask,
@@ -838,17 +779,11 @@ class Crew(BaseModel):
task_index: int,
was_replayed: bool,
) -> Optional[TaskOutput]:
"""Handle a conditional task.
Determines whether a conditional task should be executed based on the output
of previous tasks. If the task should not be executed, returns a skipped task output.
"""
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
previous_output = self._get_previous_output(task, task_outputs, task_index)
previous_output = task_outputs[task_index - 1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",

View File

@@ -19,6 +19,7 @@ from typing import (
Tuple,
Type,
Union,
ForwardRef,
)
from opentelemetry.trace import Span
@@ -137,6 +138,16 @@ class Task(BaseModel):
default=0,
description="Current number of retries"
)
parent_task: Optional['Task'] = Field(
default=None,
description="Parent task that this task was decomposed from.",
exclude=True,
)
sub_tasks: List['Task'] = Field(
default_factory=list,
description="Sub-tasks that this task was decomposed into.",
exclude=True,
)
@field_validator("guardrail")
@classmethod
@@ -246,13 +257,151 @@ class Task(BaseModel):
)
return self
def decompose(
self,
descriptions: List[str],
expected_outputs: Optional[List[str]] = None,
names: Optional[List[str]] = None
) -> List['Task']:
"""
Decompose a complex task into simpler sub-tasks.
Args:
descriptions: List of descriptions for each sub-task.
expected_outputs: Optional list of expected outputs for each sub-task.
names: Optional list of names for each sub-task.
Returns:
List of created sub-tasks.
Raises:
ValueError: If descriptions is empty, or if expected_outputs or names
have different lengths than descriptions.
Side Effects:
Modifies self.sub_tasks by adding newly created sub-tasks.
"""
if not descriptions:
raise ValueError("At least one sub-task description is required.")
if expected_outputs and len(expected_outputs) != len(descriptions):
raise ValueError(
f"If provided, expected_outputs must have the same length as descriptions. "
f"Got {len(expected_outputs)} expected outputs and {len(descriptions)} descriptions."
)
if names and len(names) != len(descriptions):
raise ValueError(
f"If provided, names must have the same length as descriptions. "
f"Got {len(names)} names and {len(descriptions)} descriptions."
)
for i, description in enumerate(descriptions):
sub_task = Task(
description=description,
expected_output=expected_outputs[i] if expected_outputs else self.expected_output,
name=names[i] if names else None,
agent=self.agent, # Inherit the agent from the parent task
tools=self.tools, # Inherit the tools from the parent task
context=[self], # Set the parent task as context for the sub-task
parent_task=self, # Reference back to the parent task
)
self.sub_tasks.append(sub_task)
return self.sub_tasks
def combine_sub_task_results(self) -> str:
"""
Combine the results from all sub-tasks into a single result for this task.
This method uses the task's agent to intelligently combine the results from
all sub-tasks. It requires an agent capable of coherent text summarization
and is designed for stateless prompt execution.
Returns:
The combined result as a string.
Raises:
ValueError: If the task has no sub-tasks or no agent assigned.
Side Effects:
None. This method does not modify the task's state.
"""
if not self.sub_tasks:
raise ValueError("Task has no sub-tasks to combine results from.")
if not self.agent:
raise ValueError("Task has no agent to combine sub-task results.")
sub_task_results = "\n\n".join([
f"Sub-task: {sub_task.description}\nResult: {sub_task.output.raw if sub_task.output else 'No result'}"
for sub_task in self.sub_tasks
])
combine_prompt = f"""
You have completed the following sub-tasks for the main task: "{self.description}"
{sub_task_results}
Based on all these sub-tasks, please provide a consolidated final answer for the main task.
Expected output format: {self.expected_output if self.expected_output else 'Not specified'}
"""
result = self.agent.execute_task(
task=self,
context=combine_prompt,
tools=self.tools or []
)
return result
def execute_sync(
self,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> TaskOutput:
"""Execute the task synchronously."""
"""
Execute the task synchronously.
If the task has sub-tasks and no output yet, this method will:
1. Execute all sub-tasks first
2. Combine their results using the agent
3. Set the combined result as this task's output
Args:
agent: Optional agent to execute the task with.
context: Optional context to pass to the task.
tools: Optional tools to pass to the task.
Returns:
TaskOutput: The result of the task execution.
Side Effects:
Sets self.output with the execution result.
"""
if self.sub_tasks and not self.output:
for sub_task in self.sub_tasks:
sub_task.execute_sync(
agent=sub_task.agent or agent,
context=context,
tools=sub_task.tools or tools or [],
)
# Combine the results from sub-tasks
result = self.combine_sub_task_results()
self.output = TaskOutput(
description=self.description,
name=self.name,
expected_output=self.expected_output,
raw=result,
agent=self.agent.role if self.agent else None,
output_format=self.output_format,
)
return self.output
return self._execute_core(agent, context, tools)
@property
@@ -278,6 +427,55 @@ class Task(BaseModel):
).start()
return future
def execute_sub_tasks_async(
self,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> List[Future[TaskOutput]]:
"""
Execute all sub-tasks asynchronously.
This method starts the execution of all sub-tasks in parallel and returns
futures that can be awaited. After all futures are complete, you should call
combine_sub_task_results() to aggregate the results.
Example:
```python
futures = task.execute_sub_tasks_async()
for future in futures:
future.result()
# Combine the results
result = task.combine_sub_task_results()
```
Args:
agent: Optional agent to execute the sub-tasks with.
context: Optional context to pass to the sub-tasks.
tools: Optional tools to pass to the sub-tasks.
Returns:
List of futures for the sub-task executions.
Raises:
ValueError: If the task has no sub-tasks.
"""
if not self.sub_tasks:
return []
futures = []
for sub_task in self.sub_tasks:
future = sub_task.execute_async(
agent=sub_task.agent or agent,
context=context,
tools=sub_task.tools or tools or [],
)
futures.append(future)
return futures
def _execute_task_async(
self,
agent: Optional[BaseAgent],
@@ -434,6 +632,8 @@ class Task(BaseModel):
"agent",
"context",
"tools",
"parent_task",
"sub_tasks",
}
copied_data = self.model_dump(exclude=exclude)
@@ -457,6 +657,7 @@ class Task(BaseModel):
agent=cloned_agent,
tools=cloned_tools,
)
return copied_task
@@ -526,3 +727,6 @@ class Task(BaseModel):
def __repr__(self):
return f"Task(description={self.description}, expected_output={self.expected_output})"
Task.model_rebuild()

View File

@@ -1,335 +0,0 @@
"""Test for multiple conditional tasks."""
from unittest.mock import MagicMock, patch
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
class TestMultipleConditionalTasks:
"""Test class for multiple conditional tasks scenarios."""
@pytest.fixture
def setup_agents(self):
"""Set up agents for the tests."""
agent1 = Agent(
role="Research Analyst",
goal="Find information",
backstory="You're a researcher",
verbose=True,
)
agent2 = Agent(
role="Data Analyst",
goal="Process information",
backstory="You process data",
verbose=True,
)
agent3 = Agent(
role="Report Writer",
goal="Write reports",
backstory="You write reports",
verbose=True,
)
return agent1, agent2, agent3
@pytest.fixture
def setup_tasks(self, setup_agents):
"""Set up tasks for the tests."""
agent1, agent2, agent3 = setup_agents
# Create tasks
task1 = Task(
description="Task 1",
expected_output="Output 1",
agent=agent1,
)
# First conditional task should check task1's output
condition1_mock = MagicMock()
task2 = ConditionalTask(
description="Conditional Task 2",
expected_output="Output 2",
agent=agent2,
condition=condition1_mock,
)
# Second conditional task should check task1's output, not task2's
condition2_mock = MagicMock()
task3 = ConditionalTask(
description="Conditional Task 3",
expected_output="Output 3",
agent=agent3,
condition=condition2_mock,
)
return task1, task2, task3, condition1_mock, condition2_mock
@pytest.fixture
def setup_crew(self, setup_agents, setup_tasks):
"""Set up crew for the tests."""
agent1, agent2, agent3 = setup_agents
task1, task2, task3, _, _ = setup_tasks
crew = Crew(
agents=[agent1, agent2, agent3],
tasks=[task1, task2, task3],
verbose=True,
)
return crew
@pytest.fixture
def setup_task_outputs(self, setup_agents):
"""Set up task outputs for the tests."""
agent1, agent2, _ = setup_agents
task1_output = TaskOutput(
description="Task 1",
raw="Task 1 output",
agent=agent1.role,
output_format=OutputFormat.RAW,
)
task2_output = TaskOutput(
description="Conditional Task 2",
raw="Task 2 output",
agent=agent2.role,
output_format=OutputFormat.RAW,
)
return task1_output, task2_output
def test_first_conditional_task_execution(self, setup_crew, setup_tasks, setup_task_outputs):
"""Test that the first conditional task is evaluated correctly."""
crew = setup_crew
_, task2, _, condition1_mock, _ = setup_tasks
task1_output, _ = setup_task_outputs
condition1_mock.return_value = True # Task should execute
result = crew._handle_conditional_task(
task=task2,
task_outputs=[task1_output],
futures=[],
task_index=1,
was_replayed=False,
)
# Verify the condition was called with task1's output
condition1_mock.assert_called_once()
args = condition1_mock.call_args[0][0]
assert args.raw == "Task 1 output"
assert result is None # Task should execute, so no skipped output
def test_second_conditional_task_execution(self, setup_crew, setup_tasks, setup_task_outputs):
"""Test that the second conditional task is evaluated correctly."""
crew = setup_crew
_, _, task3, _, condition2_mock = setup_tasks
task1_output, task2_output = setup_task_outputs
condition2_mock.return_value = True # Task should execute
result = crew._handle_conditional_task(
task=task3,
task_outputs=[task1_output, task2_output],
futures=[],
task_index=2,
was_replayed=False,
)
# Verify the condition was called with task1's output, not task2's
condition2_mock.assert_called_once()
args = condition2_mock.call_args[0][0]
assert args.raw == "Task 1 output" # Should be task1's output
assert args.raw != "Task 2 output" # Should not be task2's output
assert result is None # Task should execute, so no skipped output
def test_conditional_task_skipping(self, setup_crew, setup_tasks, setup_task_outputs):
"""Test that conditional tasks are skipped when the condition returns False."""
crew = setup_crew
_, task2, _, condition1_mock, _ = setup_tasks
task1_output, _ = setup_task_outputs
condition1_mock.return_value = False # Task should be skipped
result = crew._handle_conditional_task(
task=task2,
task_outputs=[task1_output],
futures=[],
task_index=1,
was_replayed=False,
)
# Verify the condition was called with task1's output
condition1_mock.assert_called_once()
args = condition1_mock.call_args[0][0]
assert args.raw == "Task 1 output"
assert result is not None # Task should be skipped, so there should be a skipped output
assert result.description == task2.description
def test_conditional_task_with_explicit_context(self, setup_crew, setup_agents, setup_task_outputs):
"""Test conditional task with explicit context tasks."""
crew = setup_crew
agent1, agent2, _ = setup_agents
task1_output, _ = setup_task_outputs
with patch.object(crew, '_find_task_index', return_value=0):
context_task = Task(
description="Task 1",
expected_output="Output 1",
agent=agent1,
)
condition_mock = MagicMock(return_value=True)
task_with_context = ConditionalTask(
description="Task with Context",
expected_output="Output with Context",
agent=agent2,
condition=condition_mock,
context=[context_task],
)
crew.tasks.append(task_with_context)
result = crew._handle_conditional_task(
task=task_with_context,
task_outputs=[task1_output],
futures=[],
task_index=3, # This would be the 4th task
was_replayed=False,
)
# Verify the condition was called with task1's output
condition_mock.assert_called_once()
args = condition_mock.call_args[0][0]
assert args.raw == "Task 1 output"
assert result is None # Task should execute, so no skipped output
def test_conditional_task_with_empty_task_outputs(self, setup_crew, setup_tasks):
"""Test conditional task with empty task outputs."""
crew = setup_crew
_, task2, _, condition1_mock, _ = setup_tasks
result = crew._handle_conditional_task(
task=task2,
task_outputs=[],
futures=[],
task_index=1,
was_replayed=False,
)
condition1_mock.assert_not_called()
assert result is None # Task should execute, so no skipped output
def test_multiple_conditional_tasks():
"""Test that multiple conditional tasks are evaluated correctly.
This is a legacy test that's kept for backward compatibility.
The actual tests are now in the TestMultipleConditionalTasks class.
"""
agent1 = Agent(
role="Research Analyst",
goal="Find information",
backstory="You're a researcher",
verbose=True,
)
agent2 = Agent(
role="Data Analyst",
goal="Process information",
backstory="You process data",
verbose=True,
)
agent3 = Agent(
role="Report Writer",
goal="Write reports",
backstory="You write reports",
verbose=True,
)
# Create tasks
task1 = Task(
description="Task 1",
expected_output="Output 1",
agent=agent1,
)
# First conditional task should check task1's output
condition1_mock = MagicMock()
task2 = ConditionalTask(
description="Conditional Task 2",
expected_output="Output 2",
agent=agent2,
condition=condition1_mock,
)
# Second conditional task should check task1's output, not task2's
condition2_mock = MagicMock()
task3 = ConditionalTask(
description="Conditional Task 3",
expected_output="Output 3",
agent=agent3,
condition=condition2_mock,
)
crew = Crew(
agents=[agent1, agent2, agent3],
tasks=[task1, task2, task3],
verbose=True,
)
with patch.object(crew, '_find_task_index', return_value=0):
task1_output = TaskOutput(
description="Task 1",
raw="Task 1 output",
agent=agent1.role,
output_format=OutputFormat.RAW,
)
condition1_mock.return_value = True # Task should execute
result1 = crew._handle_conditional_task(
task=task2,
task_outputs=[task1_output],
futures=[],
task_index=1,
was_replayed=False,
)
# Verify the condition was called with task1's output
condition1_mock.assert_called_once()
args1 = condition1_mock.call_args[0][0]
assert args1.raw == "Task 1 output"
assert result1 is None # Task should execute, so no skipped output
condition1_mock.reset_mock()
task2_output = TaskOutput(
description="Conditional Task 2",
raw="Task 2 output",
agent=agent2.role,
output_format=OutputFormat.RAW,
)
condition2_mock.return_value = True # Task should execute
result2 = crew._handle_conditional_task(
task=task3,
task_outputs=[task1_output, task2_output],
futures=[],
task_index=2,
was_replayed=False,
)
# Verify the condition was called with task1's output, not task2's
condition2_mock.assert_called_once()
args2 = condition2_mock.call_args[0][0]
assert args2.raw == "Task 1 output" # Should be task1's output
assert args2.raw != "Task 2 output" # Should not be task2's output
assert result2 is None # Task should execute, so no skipped output

View File

@@ -0,0 +1,157 @@
import pytest
from unittest.mock import Mock, patch
from crewai import Agent, Task
def test_task_decomposition_structure():
"""Test that task decomposition creates the proper parent-child relationship."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_task_descriptions = [
"Research AI impact on healthcare",
"Research AI impact on finance",
"Research AI impact on education",
]
sub_tasks = parent_task.decompose(
descriptions=sub_task_descriptions,
expected_outputs=["Healthcare report", "Finance report", "Education report"],
names=["Healthcare", "Finance", "Education"],
)
assert len(sub_tasks) == 3
assert len(parent_task.sub_tasks) == 3
for sub_task in sub_tasks:
assert sub_task.parent_task == parent_task
assert parent_task in sub_task.context
def test_task_execution_with_sub_tasks():
"""Test that executing a task with sub-tasks executes the sub-tasks first."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_task_descriptions = [
"Research AI impact on healthcare",
"Research AI impact on finance",
"Research AI impact on education",
]
parent_task.decompose(
descriptions=sub_task_descriptions,
expected_outputs=["Healthcare report", "Finance report", "Education report"],
)
with patch.object(Agent, 'execute_task', return_value="Mock result") as mock_execute_task:
result = parent_task.execute_sync()
assert mock_execute_task.call_count >= 3
for sub_task in parent_task.sub_tasks:
assert sub_task.output is not None
assert result is not None
assert result.raw is not None
def test_combine_sub_task_results():
"""Test that combining sub-task results works correctly."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_tasks = parent_task.decompose([
"Research AI impact on healthcare",
"Research AI impact on finance",
])
for sub_task in sub_tasks:
sub_task.output = Mock()
sub_task.output.raw = f"Result for {sub_task.description}"
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
result = parent_task.combine_sub_task_results()
assert mock_execute_task.called
assert result == "Combined result"
def test_task_decomposition_validation():
"""Test that task decomposition validates inputs correctly."""
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
)
with pytest.raises(ValueError, match="At least one sub-task description is required"):
parent_task.decompose([])
with pytest.raises(ValueError, match="expected_outputs must have the same length"):
parent_task.decompose(
["Task 1", "Task 2"],
expected_outputs=["Output 1"]
)
with pytest.raises(ValueError, match="names must have the same length"):
parent_task.decompose(
["Task 1", "Task 2"],
names=["Name 1"]
)
def test_execute_sub_tasks_async():
"""Test that executing sub-tasks asynchronously works correctly."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_tasks = parent_task.decompose([
"Research AI impact on healthcare",
"Research AI impact on finance",
])
with patch.object(Task, 'execute_async') as mock_execute_async:
mock_future = Mock()
mock_execute_async.return_value = mock_future
futures = parent_task.execute_sub_tasks_async()
assert mock_execute_async.call_count == 2
assert len(futures) == 2

View File

@@ -0,0 +1,109 @@
import pytest
from unittest.mock import Mock, patch
from crewai import Agent, Task, TaskOutput
def test_combine_sub_task_results_no_sub_tasks():
"""Test that combining sub-task results raises an error when there are no sub-tasks."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
with pytest.raises(ValueError, match="Task has no sub-tasks to combine results from"):
parent_task.combine_sub_task_results()
def test_combine_sub_task_results_no_agent():
"""Test that combining sub-task results raises an error when there is no agent."""
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
)
sub_task = Task(
description="Research AI impact on healthcare",
expected_output="Healthcare report",
parent_task=parent_task,
)
parent_task.sub_tasks.append(sub_task)
with pytest.raises(ValueError, match="Task has no agent to combine sub-task results"):
parent_task.combine_sub_task_results()
def test_execute_sync_sets_output_after_combining():
"""Test that execute_sync sets the output after combining sub-task results."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
sub_tasks = parent_task.decompose([
"Research AI impact on healthcare",
"Research AI impact on finance",
])
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
result = parent_task.execute_sync()
assert parent_task.output is not None
assert parent_task.output.raw == "Combined result"
assert result.raw == "Combined result"
assert mock_execute_task.call_count >= 3
def test_deep_cloning_prevents_shared_state():
"""Test that deep cloning prevents shared mutable state between tasks."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
copied_task = parent_task.copy()
copied_task.description = "Modified description"
assert parent_task.description == "Research the impact of AI"
assert copied_task.description == "Modified description"
parent_task.decompose(["Sub-task 1", "Sub-task 2"])
assert len(parent_task.sub_tasks) == 2
assert len(copied_task.sub_tasks) == 0
def test_execute_sub_tasks_async_empty_sub_tasks():
"""Test that execute_sub_tasks_async returns an empty list when there are no sub-tasks."""
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
)
futures = parent_task.execute_sub_tasks_async()
assert isinstance(futures, list)
assert len(futures) == 0