From a36e696a69c9a9f329690e7b6cc7a0a3dea4c023 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 29 Apr 2025 13:19:17 +0000 Subject: [PATCH] Add task decomposition feature (Issue #2717) This PR implements task decomposition as requested in Issue #2717. It allows complex tasks to be automatically split into sub-tasks without manual intervention. - Added parent_task and sub_tasks fields to Task class - Implemented decompose() method to create sub-tasks - Added combine_sub_task_results() method to aggregate results - Updated execute_sync() to handle sub-task execution - Added execute_sub_tasks_async() for asynchronous execution - Created tests for the task decomposition functionality - Added example script demonstrating usage Co-Authored-By: Joe Moura --- examples/task_decomposition_example.py | 48 ++++++++ src/crewai/task.py | 130 ++++++++++++++++++++ tests/test_task_decomposition.py | 157 +++++++++++++++++++++++++ 3 files changed, 335 insertions(+) create mode 100644 examples/task_decomposition_example.py create mode 100644 tests/test_task_decomposition.py diff --git a/examples/task_decomposition_example.py b/examples/task_decomposition_example.py new file mode 100644 index 000000000..aa0466fee --- /dev/null +++ b/examples/task_decomposition_example.py @@ -0,0 +1,48 @@ +""" +Example of using task decomposition in CrewAI. + +This example demonstrates how to use the task decomposition feature +to break down complex tasks into simpler sub-tasks. +""" + +from crewai import Agent, Task, Crew + +researcher = Agent( + role="Researcher", + goal="Research effectively", + backstory="You're an expert researcher with skills in breaking down complex topics.", +) + +research_task = Task( + description="Research the impact of AI on various industries", + expected_output="A comprehensive report covering multiple industries", + agent=researcher, +) + +sub_tasks = research_task.decompose( + descriptions=[ + "Research AI impact on healthcare industry", + "Research AI impact on finance industry", + "Research AI impact on education industry", + ], + expected_outputs=[ + "A report on AI in healthcare", + "A report on AI in finance", + "A report on AI in education", + ], + names=["Healthcare", "Finance", "Education"], +) + +crew = Crew( + agents=[researcher], + tasks=[research_task], +) + +result = crew.kickoff() +print(result) + + +# +# +# +# diff --git a/src/crewai/task.py b/src/crewai/task.py index 30ab79c00..f1fa8103b 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -19,6 +19,7 @@ from typing import ( Tuple, Type, Union, + ForwardRef, ) from opentelemetry.trace import Span @@ -137,6 +138,16 @@ class Task(BaseModel): default=0, description="Current number of retries" ) + parent_task: Optional['Task'] = Field( + default=None, + description="Parent task that this task was decomposed from.", + exclude=True, + ) + sub_tasks: List['Task'] = Field( + default_factory=list, + description="Sub-tasks that this task was decomposed into.", + exclude=True, + ) @field_validator("guardrail") @classmethod @@ -246,6 +257,81 @@ class Task(BaseModel): ) return self + def decompose( + self, + descriptions: List[str], + expected_outputs: Optional[List[str]] = None, + names: Optional[List[str]] = None + ) -> List['Task']: + """ + Decompose a complex task into simpler sub-tasks. + + Args: + descriptions: List of descriptions for each sub-task. + expected_outputs: Optional list of expected outputs for each sub-task. + names: Optional list of names for each sub-task. + + Returns: + List of created sub-tasks. + """ + if not descriptions: + raise ValueError("At least one sub-task description is required.") + + if expected_outputs and len(expected_outputs) != len(descriptions): + raise ValueError("If provided, expected_outputs must have the same length as descriptions.") + + if names and len(names) != len(descriptions): + raise ValueError("If provided, names must have the same length as descriptions.") + + for i, description in enumerate(descriptions): + sub_task = Task( + description=description, + expected_output=expected_outputs[i] if expected_outputs else self.expected_output, + name=names[i] if names else None, + agent=self.agent, # Inherit the agent from the parent task + tools=self.tools, # Inherit the tools from the parent task + context=[self], # Set the parent task as context for the sub-task + parent_task=self, # Reference back to the parent task + ) + self.sub_tasks.append(sub_task) + + return self.sub_tasks + + def combine_sub_task_results(self) -> str: + """ + Combine the results from all sub-tasks into a single result for this task. + + Returns: + The combined result as a string. + """ + if not self.sub_tasks: + raise ValueError("Task has no sub-tasks to combine results from.") + + if not self.agent: + raise ValueError("Task has no agent to combine sub-task results.") + + sub_task_results = "\n\n".join([ + f"Sub-task: {sub_task.description}\nResult: {sub_task.output.raw if sub_task.output else 'No result'}" + for sub_task in self.sub_tasks + ]) + + combine_prompt = f""" + You have completed the following sub-tasks for the main task: "{self.description}" + + {sub_task_results} + + Based on all these sub-tasks, please provide a consolidated final answer for the main task. + Expected output format: {self.expected_output if self.expected_output else 'Not specified'} + """ + + result = self.agent.execute_task( + task=self, + context=combine_prompt, + tools=self.tools or [] + ) + + return result + def execute_sync( self, agent: Optional[BaseAgent] = None, @@ -253,6 +339,17 @@ class Task(BaseModel): tools: Optional[List[BaseTool]] = None, ) -> TaskOutput: """Execute the task synchronously.""" + if self.sub_tasks and not self.output: + for sub_task in self.sub_tasks: + sub_task.execute_sync( + agent=sub_task.agent or agent, + context=context, + tools=sub_task.tools or tools or [], + ) + + result = self.combine_sub_task_results() + return self._execute_core(agent, context, tools) + return self._execute_core(agent, context, tools) @property @@ -278,6 +375,36 @@ class Task(BaseModel): ).start() return future + def execute_sub_tasks_async( + self, + agent: Optional[BaseAgent] = None, + context: Optional[str] = None, + tools: Optional[List[BaseTool]] = None, + ) -> List[Future[TaskOutput]]: + """Execute all sub-tasks asynchronously. + + Args: + agent: Optional agent to execute the sub-tasks with. + context: Optional context to pass to the sub-tasks. + tools: Optional tools to pass to the sub-tasks. + + Returns: + List of futures for the sub-task executions. + """ + if not self.sub_tasks: + return [] + + futures = [] + for sub_task in self.sub_tasks: + future = sub_task.execute_async( + agent=sub_task.agent or agent, + context=context, + tools=sub_task.tools or tools or [], + ) + futures.append(future) + + return futures + def _execute_task_async( self, agent: Optional[BaseAgent], @@ -434,6 +561,8 @@ class Task(BaseModel): "agent", "context", "tools", + "parent_task", + "sub_tasks", } copied_data = self.model_dump(exclude=exclude) @@ -457,6 +586,7 @@ class Task(BaseModel): agent=cloned_agent, tools=cloned_tools, ) + return copied_task diff --git a/tests/test_task_decomposition.py b/tests/test_task_decomposition.py new file mode 100644 index 000000000..d4d539c38 --- /dev/null +++ b/tests/test_task_decomposition.py @@ -0,0 +1,157 @@ +import pytest +from unittest.mock import Mock, patch + +from crewai import Agent, Task + + +def test_task_decomposition_structure(): + """Test that task decomposition creates the proper parent-child relationship.""" + agent = Agent( + role="Researcher", + goal="Research effectively", + backstory="You're an expert researcher", + ) + + parent_task = Task( + description="Research the impact of AI on various industries", + expected_output="A comprehensive report", + agent=agent, + ) + + sub_task_descriptions = [ + "Research AI impact on healthcare", + "Research AI impact on finance", + "Research AI impact on education", + ] + + sub_tasks = parent_task.decompose( + descriptions=sub_task_descriptions, + expected_outputs=["Healthcare report", "Finance report", "Education report"], + names=["Healthcare", "Finance", "Education"], + ) + + assert len(sub_tasks) == 3 + assert len(parent_task.sub_tasks) == 3 + + for sub_task in sub_tasks: + assert sub_task.parent_task == parent_task + assert parent_task in sub_task.context + + +def test_task_execution_with_sub_tasks(): + """Test that executing a task with sub-tasks executes the sub-tasks first.""" + agent = Agent( + role="Researcher", + goal="Research effectively", + backstory="You're an expert researcher", + ) + + parent_task = Task( + description="Research the impact of AI on various industries", + expected_output="A comprehensive report", + agent=agent, + ) + + sub_task_descriptions = [ + "Research AI impact on healthcare", + "Research AI impact on finance", + "Research AI impact on education", + ] + + parent_task.decompose( + descriptions=sub_task_descriptions, + expected_outputs=["Healthcare report", "Finance report", "Education report"], + ) + + with patch.object(Agent, 'execute_task', return_value="Mock result") as mock_execute_task: + result = parent_task.execute_sync() + + assert mock_execute_task.call_count >= 3 + + for sub_task in parent_task.sub_tasks: + assert sub_task.output is not None + + assert result is not None + assert result.raw is not None + + +def test_combine_sub_task_results(): + """Test that combining sub-task results works correctly.""" + agent = Agent( + role="Researcher", + goal="Research effectively", + backstory="You're an expert researcher", + ) + + parent_task = Task( + description="Research the impact of AI on various industries", + expected_output="A comprehensive report", + agent=agent, + ) + + sub_tasks = parent_task.decompose([ + "Research AI impact on healthcare", + "Research AI impact on finance", + ]) + + for sub_task in sub_tasks: + sub_task.output = Mock() + sub_task.output.raw = f"Result for {sub_task.description}" + + with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task: + result = parent_task.combine_sub_task_results() + + assert mock_execute_task.called + assert result == "Combined result" + + +def test_task_decomposition_validation(): + """Test that task decomposition validates inputs correctly.""" + parent_task = Task( + description="Research the impact of AI", + expected_output="A report", + ) + + with pytest.raises(ValueError, match="At least one sub-task description is required"): + parent_task.decompose([]) + + with pytest.raises(ValueError, match="expected_outputs must have the same length"): + parent_task.decompose( + ["Task 1", "Task 2"], + expected_outputs=["Output 1"] + ) + + with pytest.raises(ValueError, match="names must have the same length"): + parent_task.decompose( + ["Task 1", "Task 2"], + names=["Name 1"] + ) + + +def test_execute_sub_tasks_async(): + """Test that executing sub-tasks asynchronously works correctly.""" + agent = Agent( + role="Researcher", + goal="Research effectively", + backstory="You're an expert researcher", + ) + + parent_task = Task( + description="Research the impact of AI on various industries", + expected_output="A comprehensive report", + agent=agent, + ) + + sub_tasks = parent_task.decompose([ + "Research AI impact on healthcare", + "Research AI impact on finance", + ]) + + with patch.object(Task, 'execute_async') as mock_execute_async: + mock_future = Mock() + mock_execute_async.return_value = mock_future + + futures = parent_task.execute_sub_tasks_async() + + assert mock_execute_async.call_count == 2 + assert len(futures) == 2