Address PR feedback: updated conditional tasks tests and indexing

Co-Authored-By: brandon@crewai.com <brandon@crewai.com>
This commit is contained in:
Devin AI
2025-01-22 20:59:35 +00:00
parent f641668130
commit b21e49269e
2 changed files with 112 additions and 60 deletions

View File

@@ -378,6 +378,22 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def validate_must_have_non_conditional_task(self) -> "Crew":
"""Ensure that a crew has at least one non-conditional task."""
if not self.tasks:
return self
non_conditional_count = sum(
1 for task in self.tasks if not isinstance(task, ConditionalTask)
)
if non_conditional_count == 0:
raise PydanticCustomError(
"only_conditional_tasks",
"Crew must include at least one non-conditional task",
{},
)
return self
@model_validator(mode="after")
def validate_first_task(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
@@ -437,19 +453,7 @@ class Crew(BaseModel):
)
return self
@model_validator(mode="after")
def validate_must_have_non_conditional_task(self) -> "Crew":
"""Ensure that a crew has at least one non-conditional task."""
non_conditional_count = sum(
1 for task in self.tasks if not isinstance(task, ConditionalTask)
)
if non_conditional_count == 0:
raise PydanticCustomError(
"only_conditional_tasks",
"Crew must include at least one non-conditional task.",
{},
)
return self
@property
def key(self) -> str:
@@ -753,6 +757,7 @@ class Crew(BaseModel):
task, task_outputs, futures, task_index, was_replayed
)
if skipped_task_output:
task_outputs.append(skipped_task_output)
continue
if task.async_execution:

View File

@@ -74,7 +74,7 @@ def test_crew_with_only_conditional_tasks_raises_error():
with pytest.raises(
pydantic_core._pydantic_core.ValidationError,
match="The first task cannot be a ConditionalTask",
match="Crew must include at least one non-conditional task",
):
Crew(
agents=[researcher],
@@ -2030,71 +2030,96 @@ def test_conditional_task_uses_last_output():
expected_output="First output",
agent=researcher,
)
task2 = Task(
description="Second task",
def condition_fails(task_output: TaskOutput) -> bool:
# This condition will never be met
return "never matches" in task_output.raw.lower()
def condition_succeeds(task_output: TaskOutput) -> bool:
# This condition will match first task's output
return "first success" in task_output.raw.lower()
conditional_task1 = ConditionalTask(
description="Second task - conditional that fails condition",
expected_output="Second output",
agent=researcher,
condition=condition_fails,
)
def condition_func(task_output: TaskOutput) -> bool:
# Should only be true if evaluating task2's output
return "second success" in task_output.raw.lower()
conditional_task = ConditionalTask(
description="Conditional task that should only run if second task succeeded",
expected_output="Conditional output",
conditional_task2 = ConditionalTask(
description="Third task - conditional that succeeds using first task output",
expected_output="Third output",
agent=writer,
condition=condition_func,
condition=condition_succeeds,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, conditional_task],
tasks=[task1, conditional_task1, conditional_task2],
)
# Mock different outputs for task1 and task2
# Mock outputs for tasks
mock_first = TaskOutput(
description="Mock first",
raw="First success output", # Would trigger condition if using first output
description="First task output",
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
)
mock_second = TaskOutput(
description="Mock second",
raw="Second success output", # Should trigger condition
mock_skipped = TaskOutput(
description="Second task output",
raw="", # Empty output since condition fails
agent=researcher.role,
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
)
# Set up mocks to return different outputs for each task
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_second, mock_first] # Third value is for conditional task if it runs
result = crew.kickoff()
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
# Verify conditional task executed (used task2's output)
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_third]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify outputs collection
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "First success output" # First task succeeded
assert result.tasks_output[1].raw == "" # Second task skipped (condition failed)
assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_tasks_with_conditional():
"""Test that having multiple tasks before a conditional task works correctly."""
def test_conditional_tasks_result_collection():
"""Test that task outputs are properly collected based on execution status."""
task1 = Task(
description="Research task 1",
expected_output="Research output",
agent=researcher,
)
task2 = Task(
description="Research task 2",
expected_output="Research output",
description="Normal task that always executes",
expected_output="First output",
agent=researcher,
)
def condition_func(task_output: TaskOutput) -> bool:
def condition_never_met(task_output: TaskOutput) -> bool:
return "never matches" in task_output.raw.lower()
def condition_always_met(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
task2 = ConditionalTask(
description="Conditional task that never executes",
expected_output="Second output",
agent=researcher,
condition=condition_never_met,
)
task3 = ConditionalTask(
description="Conditional task that runs if previous task succeeded",
expected_output="Conditional output",
description="Conditional task that always executes",
expected_output="Third output",
agent=writer,
condition=condition_func,
condition=condition_always_met,
)
crew = Crew(
@@ -2102,19 +2127,41 @@ def test_multiple_tasks_with_conditional():
tasks=[task1, task2, task3],
)
# Mock task outputs
# Mock outputs for different execution paths
mock_success = TaskOutput(
description="Mock success",
raw="Success output",
description="Success output",
raw="Success output", # Triggers third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Skipped output",
raw="", # Empty output for skipped task
agent=researcher.role,
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
# Verify all tasks were executed (no IndexError)
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_success, mock_conditional]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify task output collection
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "Success output" # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():