From 56ec9bc2241336156d223063c5250662de19ee84 Mon Sep 17 00:00:00 2001 From: "devin-ai-integration[bot]" <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 9 Feb 2025 16:20:16 -0300 Subject: [PATCH] fix: handle multiple task outputs correctly in conditional tasks (#1937) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: handle multiple task outputs correctly in conditional tasks - Fix IndexError in _handle_conditional_task by using first output - Modify _execute_tasks to accumulate task outputs instead of resetting - Update _create_crew_output to handle multiple outputs correctly - Add tests for multiple tasks with conditional and multiple conditional tasks Co-Authored-By: brandon@crewai.com * feat: validate at least one non-conditional task and refine task outputs Co-Authored-By: brandon@crewai.com * Revert to single output in _create_crew_output; remove redundant empty task check Co-Authored-By: brandon@crewai.com * Address PR feedback: use last output in conditional tasks, add validation test Co-Authored-By: brandon@crewai.com * Address PR feedback: updated conditional tasks tests and indexing Co-Authored-By: brandon@crewai.com --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: brandon@crewai.com Co-authored-by: Brandon Hancock Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com> Co-authored-by: João Moura --- src/crewai/crew.py | 39 ++++++-- tests/crew_test.py | 222 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 253 insertions(+), 8 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index f36df2a03..9ae9ce2c0 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -380,6 +380,22 @@ class Crew(BaseModel): return self + @model_validator(mode="after") + def validate_must_have_non_conditional_task(self) -> "Crew": + """Ensure that a crew has at least one non-conditional task.""" + if not self.tasks: + return self + non_conditional_count = sum( + 1 for task in self.tasks if not isinstance(task, ConditionalTask) + ) + if non_conditional_count == 0: + raise PydanticCustomError( + "only_conditional_tasks", + "Crew must include at least one non-conditional task", + {}, + ) + return self + @model_validator(mode="after") def validate_first_task(self) -> "Crew": """Ensure the first task is not a ConditionalTask.""" @@ -439,6 +455,8 @@ class Crew(BaseModel): ) return self + + @property def key(self) -> str: source = [agent.key for agent in self.agents] + [ @@ -741,6 +759,7 @@ class Crew(BaseModel): task, task_outputs, futures, task_index, was_replayed ) if skipped_task_output: + task_outputs.append(skipped_task_output) continue if task.async_execution: @@ -764,7 +783,7 @@ class Crew(BaseModel): context=context, tools=tools_for_task, ) - task_outputs = [task_output] + task_outputs.append(task_output) self._process_task_result(task, task_output) self._store_execution_log(task, task_output, task_index, was_replayed) @@ -785,7 +804,7 @@ class Crew(BaseModel): task_outputs = self._process_async_tasks(futures, was_replayed) futures.clear() - previous_output = task_outputs[task_index - 1] if task_outputs else None + previous_output = task_outputs[-1] if task_outputs else None if previous_output is not None and not task.should_execute(previous_output): self._logger.log( "debug", @@ -907,11 +926,15 @@ class Crew(BaseModel): ) def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput: - if len(task_outputs) != 1: - raise ValueError( - "Something went wrong. Kickoff should return only one task output." - ) - final_task_output = task_outputs[0] + if not task_outputs: + raise ValueError("No task outputs available to create crew output.") + + # Filter out empty outputs and get the last valid one as the main output + valid_outputs = [t for t in task_outputs if t.raw] + if not valid_outputs: + raise ValueError("No valid task outputs available to create crew output.") + final_task_output = valid_outputs[-1] + final_string_output = final_task_output.raw self._finish_execution(final_string_output) token_usage = self.calculate_usage_metrics() @@ -920,7 +943,7 @@ class Crew(BaseModel): raw=final_task_output.raw, pydantic=final_task_output.pydantic, json_dict=final_task_output.json_dict, - tasks_output=[task.output for task in self.tasks if task.output], + tasks_output=task_outputs, token_usage=token_usage, ) diff --git a/tests/crew_test.py b/tests/crew_test.py index 4812ab93f..e69c71315 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -49,6 +49,39 @@ writer = Agent( ) +def test_crew_with_only_conditional_tasks_raises_error(): + """Test that creating a crew with only conditional tasks raises an error.""" + def condition_func(task_output: TaskOutput) -> bool: + return True + + conditional1 = ConditionalTask( + description="Conditional task 1", + expected_output="Output 1", + agent=researcher, + condition=condition_func, + ) + conditional2 = ConditionalTask( + description="Conditional task 2", + expected_output="Output 2", + agent=researcher, + condition=condition_func, + ) + conditional3 = ConditionalTask( + description="Conditional task 3", + expected_output="Output 3", + agent=researcher, + condition=condition_func, + ) + + with pytest.raises( + pydantic_core._pydantic_core.ValidationError, + match="Crew must include at least one non-conditional task", + ): + Crew( + agents=[researcher], + tasks=[conditional1, conditional2, conditional3], + ) + def test_crew_config_conditional_requirement(): with pytest.raises(ValueError): Crew(process=Process.sequential) @@ -2060,6 +2093,195 @@ def test_tools_with_custom_caching(): assert result.raw == "3" +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_conditional_task_uses_last_output(): + """Test that conditional tasks use the last task output for condition evaluation.""" + task1 = Task( + description="First task", + expected_output="First output", + agent=researcher, + ) + def condition_fails(task_output: TaskOutput) -> bool: + # This condition will never be met + return "never matches" in task_output.raw.lower() + + def condition_succeeds(task_output: TaskOutput) -> bool: + # This condition will match first task's output + return "first success" in task_output.raw.lower() + + conditional_task1 = ConditionalTask( + description="Second task - conditional that fails condition", + expected_output="Second output", + agent=researcher, + condition=condition_fails, + ) + + conditional_task2 = ConditionalTask( + description="Third task - conditional that succeeds using first task output", + expected_output="Third output", + agent=writer, + condition=condition_succeeds, + ) + + crew = Crew( + agents=[researcher, writer], + tasks=[task1, conditional_task1, conditional_task2], + ) + + # Mock outputs for tasks + mock_first = TaskOutput( + description="First task output", + raw="First success output", # Will be used by third task's condition + agent=researcher.role, + ) + mock_skipped = TaskOutput( + description="Second task output", + raw="", # Empty output since condition fails + agent=researcher.role, + ) + mock_third = TaskOutput( + description="Third task output", + raw="Third task executed", # Output when condition succeeds using first task output + agent=writer.role, + ) + + # Set up mocks for task execution and conditional logic + with patch.object(ConditionalTask, "should_execute") as mock_should_execute: + # First conditional fails, second succeeds + mock_should_execute.side_effect = [False, True] + + with patch.object(Task, "execute_sync") as mock_execute: + mock_execute.side_effect = [mock_first, mock_third] + result = crew.kickoff() + + # Verify execution behavior + assert mock_execute.call_count == 2 # Only first and third tasks execute + assert mock_should_execute.call_count == 2 # Both conditionals checked + + # Verify outputs collection + assert len(result.tasks_output) == 3 + assert result.tasks_output[0].raw == "First success output" # First task succeeded + assert result.tasks_output[1].raw == "" # Second task skipped (condition failed) + assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_conditional_tasks_result_collection(): + """Test that task outputs are properly collected based on execution status.""" + task1 = Task( + description="Normal task that always executes", + expected_output="First output", + agent=researcher, + ) + + def condition_never_met(task_output: TaskOutput) -> bool: + return "never matches" in task_output.raw.lower() + + def condition_always_met(task_output: TaskOutput) -> bool: + return "success" in task_output.raw.lower() + + task2 = ConditionalTask( + description="Conditional task that never executes", + expected_output="Second output", + agent=researcher, + condition=condition_never_met, + ) + + task3 = ConditionalTask( + description="Conditional task that always executes", + expected_output="Third output", + agent=writer, + condition=condition_always_met, + ) + + crew = Crew( + agents=[researcher, writer], + tasks=[task1, task2, task3], + ) + + # Mock outputs for different execution paths + mock_success = TaskOutput( + description="Success output", + raw="Success output", # Triggers third task's condition + agent=researcher.role, + ) + mock_skipped = TaskOutput( + description="Skipped output", + raw="", # Empty output for skipped task + agent=researcher.role, + ) + mock_conditional = TaskOutput( + description="Conditional output", + raw="Conditional task executed", + agent=writer.role, + ) + + # Set up mocks for task execution and conditional logic + with patch.object(ConditionalTask, "should_execute") as mock_should_execute: + # First conditional fails, second succeeds + mock_should_execute.side_effect = [False, True] + + with patch.object(Task, "execute_sync") as mock_execute: + mock_execute.side_effect = [mock_success, mock_conditional] + result = crew.kickoff() + + # Verify execution behavior + assert mock_execute.call_count == 2 # Only first and third tasks execute + assert mock_should_execute.call_count == 2 # Both conditionals checked + + # Verify task output collection + assert len(result.tasks_output) == 3 + assert result.tasks_output[0].raw == "Success output" # Normal task executed + assert result.tasks_output[1].raw == "" # Second task skipped + assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_multiple_conditional_tasks(): + """Test that having multiple conditional tasks in sequence works correctly.""" + task1 = Task( + description="Initial research task", + expected_output="Research output", + agent=researcher, + ) + + def condition1(task_output: TaskOutput) -> bool: + return "success" in task_output.raw.lower() + + def condition2(task_output: TaskOutput) -> bool: + return "proceed" in task_output.raw.lower() + + task2 = ConditionalTask( + description="First conditional task", + expected_output="Conditional output 1", + agent=writer, + condition=condition1, + ) + + task3 = ConditionalTask( + description="Second conditional task", + expected_output="Conditional output 2", + agent=writer, + condition=condition2, + ) + + crew = Crew( + agents=[researcher, writer], + tasks=[task1, task2, task3], + ) + + # Mock different task outputs to test conditional logic + mock_success = TaskOutput( + description="Mock success", + raw="Success and proceed output", + agent=researcher.role, + ) + + # Set up mocks for task execution + with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute: + result = crew.kickoff() + # Verify all tasks were executed (no IndexError) + assert mock_execute.call_count == 3 + assert len(result.tasks_output) == 3 + @pytest.mark.vcr(filter_headers=["authorization"]) def test_using_contextual_memory(): from unittest.mock import patch