fix: handle multiple task outputs correctly in conditional tasks

- Fix IndexError in _handle_conditional_task by using first output
- Modify _execute_tasks to accumulate task outputs instead of resetting
- Update _create_crew_output to handle multiple outputs correctly
- Add tests for multiple tasks with conditional and multiple conditional tasks

Co-Authored-By: brandon@crewai.com <brandon@crewai.com>
This commit is contained in:
Devin AI
2025-01-21 17:23:01 +00:00
parent a21e310d78
commit 0082cafe12
2 changed files with 99 additions and 8 deletions

View File

@@ -762,7 +762,7 @@ class Crew(BaseModel):
context=context,
tools=tools_for_task,
)
task_outputs = [task_output]
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
@@ -783,7 +783,7 @@ class Crew(BaseModel):
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
previous_output = task_outputs[task_index - 1] if task_outputs else None
previous_output = task_outputs[0] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",
@@ -905,11 +905,11 @@ class Crew(BaseModel):
)
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
if len(task_outputs) != 1:
raise ValueError(
"Something went wrong. Kickoff should return only one task output."
)
final_task_output = task_outputs[0]
# Use the last task output as the final output
final_task_output = task_outputs[-1] if task_outputs else None
if not final_task_output:
raise ValueError("No task outputs available to create crew output.")
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
@@ -918,7 +918,7 @@ class Crew(BaseModel):
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=[task.output for task in self.tasks if task.output],
tasks_output=task_outputs, # Use all task outputs directly
token_usage=token_usage,
)

View File

@@ -1989,6 +1989,97 @@ def test_tools_with_custom_caching():
assert result.raw == "3"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_tasks_with_conditional():
"""Test that having multiple tasks before a conditional task works correctly."""
task1 = Task(
description="Research task 1",
expected_output="Research output",
agent=researcher,
)
task2 = Task(
description="Research task 2",
expected_output="Research output",
agent=researcher,
)
def condition_func(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
task3 = ConditionalTask(
description="Conditional task that runs if previous task succeeded",
expected_output="Conditional output",
agent=writer,
condition=condition_func,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock task outputs
mock_success = TaskOutput(
description="Mock success",
raw="Success output",
agent=researcher.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
# Verify all tasks were executed (no IndexError)
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():
"""Test that having multiple conditional tasks in sequence works correctly."""
task1 = Task(
description="Initial research task",
expected_output="Research output",
agent=researcher,
)
def condition1(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
def condition2(task_output: TaskOutput) -> bool:
return "proceed" in task_output.raw.lower()
task2 = ConditionalTask(
description="First conditional task",
expected_output="Conditional output 1",
agent=writer,
condition=condition1,
)
task3 = ConditionalTask(
description="Second conditional task",
expected_output="Conditional output 2",
agent=writer,
condition=condition2,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock different task outputs to test conditional logic
mock_success = TaskOutput(
description="Mock success",
raw="Success and proceed output",
agent=researcher.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
# Verify all tasks were executed (no IndexError)
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch