From 1cf09ac7ce6a4b34aaf0f76c7465b9506f043369 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 29 Apr 2025 13:25:40 +0000 Subject: [PATCH] Address PR feedback: Fix ForwardRef issues, improve error messages, enhance docs Co-Authored-By: Joe Moura --- examples/task_decomposition_example.py | 11 +- src/crewai/task.py | 84 ++++++++++++++- tests/test_task_decomposition_edge_cases.py | 109 ++++++++++++++++++++ 3 files changed, 193 insertions(+), 11 deletions(-) create mode 100644 tests/test_task_decomposition_edge_cases.py diff --git a/examples/task_decomposition_example.py b/examples/task_decomposition_example.py index aa0466fee..a1e07ff9c 100644 --- a/examples/task_decomposition_example.py +++ b/examples/task_decomposition_example.py @@ -3,6 +3,8 @@ Example of using task decomposition in CrewAI. This example demonstrates how to use the task decomposition feature to break down complex tasks into simpler sub-tasks. + +Feature introduced in CrewAI v1.x.x """ from crewai import Agent, Task, Crew @@ -39,10 +41,7 @@ crew = Crew( ) result = crew.kickoff() -print(result) +print("Final result:", result) - -# -# -# -# +for i, sub_task in enumerate(research_task.sub_tasks): + print(f"Sub-task {i+1} result: {sub_task.output.raw if hasattr(sub_task, 'output') and sub_task.output else 'No output'}") diff --git a/src/crewai/task.py b/src/crewai/task.py index f1fa8103b..290c00380 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -273,15 +273,28 @@ class Task(BaseModel): Returns: List of created sub-tasks. + + Raises: + ValueError: If descriptions is empty, or if expected_outputs or names + have different lengths than descriptions. + + Side Effects: + Modifies self.sub_tasks by adding newly created sub-tasks. """ if not descriptions: raise ValueError("At least one sub-task description is required.") if expected_outputs and len(expected_outputs) != len(descriptions): - raise ValueError("If provided, expected_outputs must have the same length as descriptions.") + raise ValueError( + f"If provided, expected_outputs must have the same length as descriptions. " + f"Got {len(expected_outputs)} expected outputs and {len(descriptions)} descriptions." + ) if names and len(names) != len(descriptions): - raise ValueError("If provided, names must have the same length as descriptions.") + raise ValueError( + f"If provided, names must have the same length as descriptions. " + f"Got {len(names)} names and {len(descriptions)} descriptions." + ) for i, description in enumerate(descriptions): sub_task = Task( @@ -301,8 +314,18 @@ class Task(BaseModel): """ Combine the results from all sub-tasks into a single result for this task. + This method uses the task's agent to intelligently combine the results from + all sub-tasks. It requires an agent capable of coherent text summarization + and is designed for stateless prompt execution. + Returns: The combined result as a string. + + Raises: + ValueError: If the task has no sub-tasks or no agent assigned. + + Side Effects: + None. This method does not modify the task's state. """ if not self.sub_tasks: raise ValueError("Task has no sub-tasks to combine results from.") @@ -338,7 +361,25 @@ class Task(BaseModel): context: Optional[str] = None, tools: Optional[List[BaseTool]] = None, ) -> TaskOutput: - """Execute the task synchronously.""" + """ + Execute the task synchronously. + + If the task has sub-tasks and no output yet, this method will: + 1. Execute all sub-tasks first + 2. Combine their results using the agent + 3. Set the combined result as this task's output + + Args: + agent: Optional agent to execute the task with. + context: Optional context to pass to the task. + tools: Optional tools to pass to the task. + + Returns: + TaskOutput: The result of the task execution. + + Side Effects: + Sets self.output with the execution result. + """ if self.sub_tasks and not self.output: for sub_task in self.sub_tasks: sub_task.execute_sync( @@ -347,8 +388,19 @@ class Task(BaseModel): tools=sub_task.tools or tools or [], ) + # Combine the results from sub-tasks result = self.combine_sub_task_results() - return self._execute_core(agent, context, tools) + + self.output = TaskOutput( + description=self.description, + name=self.name, + expected_output=self.expected_output, + raw=result, + agent=self.agent.role if self.agent else None, + output_format=self.output_format, + ) + + return self.output return self._execute_core(agent, context, tools) @@ -381,7 +433,23 @@ class Task(BaseModel): context: Optional[str] = None, tools: Optional[List[BaseTool]] = None, ) -> List[Future[TaskOutput]]: - """Execute all sub-tasks asynchronously. + """ + Execute all sub-tasks asynchronously. + + This method starts the execution of all sub-tasks in parallel and returns + futures that can be awaited. After all futures are complete, you should call + combine_sub_task_results() to aggregate the results. + + Example: + ```python + futures = task.execute_sub_tasks_async() + + for future in futures: + future.result() + + # Combine the results + result = task.combine_sub_task_results() + ``` Args: agent: Optional agent to execute the sub-tasks with. @@ -390,6 +458,9 @@ class Task(BaseModel): Returns: List of futures for the sub-task executions. + + Raises: + ValueError: If the task has no sub-tasks. """ if not self.sub_tasks: return [] @@ -656,3 +727,6 @@ class Task(BaseModel): def __repr__(self): return f"Task(description={self.description}, expected_output={self.expected_output})" + + +Task.model_rebuild() diff --git a/tests/test_task_decomposition_edge_cases.py b/tests/test_task_decomposition_edge_cases.py new file mode 100644 index 000000000..fab7d222c --- /dev/null +++ b/tests/test_task_decomposition_edge_cases.py @@ -0,0 +1,109 @@ +import pytest +from unittest.mock import Mock, patch + +from crewai import Agent, Task, TaskOutput + + +def test_combine_sub_task_results_no_sub_tasks(): + """Test that combining sub-task results raises an error when there are no sub-tasks.""" + agent = Agent( + role="Researcher", + goal="Research effectively", + backstory="You're an expert researcher", + ) + + parent_task = Task( + description="Research the impact of AI", + expected_output="A report", + agent=agent, + ) + + with pytest.raises(ValueError, match="Task has no sub-tasks to combine results from"): + parent_task.combine_sub_task_results() + + +def test_combine_sub_task_results_no_agent(): + """Test that combining sub-task results raises an error when there is no agent.""" + parent_task = Task( + description="Research the impact of AI", + expected_output="A report", + ) + + sub_task = Task( + description="Research AI impact on healthcare", + expected_output="Healthcare report", + parent_task=parent_task, + ) + parent_task.sub_tasks.append(sub_task) + + with pytest.raises(ValueError, match="Task has no agent to combine sub-task results"): + parent_task.combine_sub_task_results() + + +def test_execute_sync_sets_output_after_combining(): + """Test that execute_sync sets the output after combining sub-task results.""" + agent = Agent( + role="Researcher", + goal="Research effectively", + backstory="You're an expert researcher", + ) + + parent_task = Task( + description="Research the impact of AI", + expected_output="A report", + agent=agent, + ) + + sub_tasks = parent_task.decompose([ + "Research AI impact on healthcare", + "Research AI impact on finance", + ]) + + with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task: + result = parent_task.execute_sync() + + assert parent_task.output is not None + assert parent_task.output.raw == "Combined result" + assert result.raw == "Combined result" + + assert mock_execute_task.call_count >= 3 + + +def test_deep_cloning_prevents_shared_state(): + """Test that deep cloning prevents shared mutable state between tasks.""" + agent = Agent( + role="Researcher", + goal="Research effectively", + backstory="You're an expert researcher", + ) + + parent_task = Task( + description="Research the impact of AI", + expected_output="A report", + agent=agent, + ) + + copied_task = parent_task.copy() + + copied_task.description = "Modified description" + + assert parent_task.description == "Research the impact of AI" + assert copied_task.description == "Modified description" + + parent_task.decompose(["Sub-task 1", "Sub-task 2"]) + + assert len(parent_task.sub_tasks) == 2 + assert len(copied_task.sub_tasks) == 0 + + +def test_execute_sub_tasks_async_empty_sub_tasks(): + """Test that execute_sub_tasks_async returns an empty list when there are no sub-tasks.""" + parent_task = Task( + description="Research the impact of AI", + expected_output="A report", + ) + + futures = parent_task.execute_sub_tasks_async() + + assert isinstance(futures, list) + assert len(futures) == 0