Files
crewAI/tests/test_dynamic_task_ordering.py
Devin AI ed95f47b80 fix: Resolve critical bugs identified by Cursor Bugbot
- Fix task completion tracking to use task.output instead of non-existent task_id
- Update callback validation to raise ValueError instead of PydanticCustomError
- Refactor _execute_tasks to prevent task skipping and ensure all tasks execute exactly once
- Maintain replay functionality compatibility with dynamic ordering
- Remove undefined current_index variable reference

Addresses all 3 bugs reported by automated analysis:
1. Task Skipping and Replay Breakage
2. Callback Validation Error Handling Mismatch
3. TaskOutput Missing task_id Causes Errors

Co-Authored-By: João <joao@crewai.com>
2025-09-29 11:08:32 +00:00

303 lines
9.2 KiB
Python

import pytest
from unittest.mock import Mock
from crewai import Agent, Crew, Task
from crewai.process import Process
from crewai.task import TaskOutput
@pytest.fixture
def agents():
return [
Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1"),
Agent(role="Agent 2", goal="Goal 2", backstory="Backstory 2"),
Agent(role="Agent 3", goal="Goal 3", backstory="Backstory 3"),
]
@pytest.fixture
def tasks(agents):
return [
Task(description="Task 1", expected_output="Output 1", agent=agents[0]),
Task(description="Task 2", expected_output="Output 2", agent=agents[1]),
Task(description="Task 3", expected_output="Output 3", agent=agents[2]),
]
def test_sequential_process_with_reverse_ordering(agents, tasks):
"""Test sequential process with reverse task ordering."""
execution_order = []
def reverse_ordering_callback(all_tasks, completed_outputs, current_index):
completed_tasks = {id(task) for task in all_tasks if task.output is not None}
remaining_indices = [i for i in range(len(all_tasks))
if id(all_tasks[i]) not in completed_tasks]
if remaining_indices:
next_index = max(remaining_indices)
execution_order.append(next_index)
return next_index
return None
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
task_ordering_callback=reverse_ordering_callback,
verbose=False
)
result = crew.kickoff()
assert len(result.tasks_output) == 3
assert execution_order == [2, 1, 0]
def test_hierarchical_process_with_priority_ordering(agents, tasks):
"""Test hierarchical process with priority-based task ordering."""
tasks[0].priority = 3
tasks[1].priority = 1
tasks[2].priority = 2
execution_order = []
def priority_ordering_callback(all_tasks, completed_outputs, current_index):
completed_tasks = {id(task) for task in all_tasks if task.output is not None}
remaining_tasks = [
(i, task) for i, task in enumerate(all_tasks)
if id(task) not in completed_tasks
]
if remaining_tasks:
remaining_tasks.sort(key=lambda x: getattr(x[1], 'priority', 999))
next_index = remaining_tasks[0][0]
execution_order.append(next_index)
return next_index
return None
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.hierarchical,
manager_llm="gpt-4o",
task_ordering_callback=priority_ordering_callback,
verbose=False
)
result = crew.kickoff()
assert len(result.tasks_output) == 3
assert execution_order == [1, 2, 0]
def test_task_ordering_callback_with_task_object_return():
"""Test callback returning Task object instead of index."""
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
tasks = [
Task(description="Task A", expected_output="Output A", agent=agents[0]),
Task(description="Task B", expected_output="Output B", agent=agents[0]),
]
execution_order = []
def task_object_callback(all_tasks, completed_outputs, current_index):
if len(completed_outputs) == 0:
execution_order.append(1)
return all_tasks[1]
elif len(completed_outputs) == 1:
execution_order.append(0)
return all_tasks[0]
return None
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
task_ordering_callback=task_object_callback,
verbose=False
)
result = crew.kickoff()
assert len(result.tasks_output) == 2
assert execution_order == [1, 0]
def test_invalid_task_ordering_callback_index():
"""Test handling of invalid task index from callback."""
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
tasks = [Task(description="Task", expected_output="Output", agent=agents[0])]
def invalid_callback(all_tasks, completed_outputs, current_index):
return 999
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
task_ordering_callback=invalid_callback,
verbose=False
)
result = crew.kickoff()
assert len(result.tasks_output) == 1
def test_task_ordering_callback_exception_handling():
"""Test handling of exceptions in task ordering callback."""
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
tasks = [Task(description="Task", expected_output="Output", agent=agents[0])]
def failing_callback(all_tasks, completed_outputs, current_index):
raise ValueError("Callback error")
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
task_ordering_callback=failing_callback,
verbose=False
)
result = crew.kickoff()
assert len(result.tasks_output) == 1
def test_task_ordering_callback_validation():
"""Test validation of task ordering callback signature."""
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
tasks = [Task(description="Task", expected_output="Output", agent=agents[0])]
def invalid_signature_callback(only_one_param):
return 0
with pytest.raises(ValueError, match="task_ordering_callback must accept exactly 3 parameters"):
Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
task_ordering_callback=invalid_signature_callback
)
def test_no_task_ordering_callback_default_behavior():
"""Test that default behavior is unchanged when no callback is provided."""
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
tasks = [
Task(description="Task 1", expected_output="Output 1", agent=agents[0]),
Task(description="Task 2", expected_output="Output 2", agent=agents[0]),
]
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
verbose=False
)
result = crew.kickoff()
assert len(result.tasks_output) == 2
def test_task_ordering_callback_with_none_return():
"""Test callback returning None for default ordering."""
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
tasks = [
Task(description="Task 1", expected_output="Output 1", agent=agents[0]),
Task(description="Task 2", expected_output="Output 2", agent=agents[0]),
]
def none_callback(all_tasks, completed_outputs, current_index):
return None
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
task_ordering_callback=none_callback,
verbose=False
)
result = crew.kickoff()
assert len(result.tasks_output) == 2
def test_task_ordering_callback_invalid_task_object():
"""Test handling of invalid Task object from callback."""
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
tasks = [Task(description="Task", expected_output="Output", agent=agents[0])]
invalid_task = Task(description="Invalid", expected_output="Invalid", agent=agents[0])
def invalid_task_callback(all_tasks, completed_outputs, current_index):
return invalid_task
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
task_ordering_callback=invalid_task_callback,
verbose=False
)
result = crew.kickoff()
assert len(result.tasks_output) == 1
def test_task_ordering_callback_invalid_return_type():
"""Test handling of invalid return type from callback."""
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
tasks = [Task(description="Task", expected_output="Output", agent=agents[0])]
def invalid_type_callback(all_tasks, completed_outputs, current_index):
return "invalid"
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
task_ordering_callback=invalid_type_callback,
verbose=False
)
result = crew.kickoff()
assert len(result.tasks_output) == 1
def test_task_ordering_prevents_infinite_loops():
"""Test that task ordering prevents infinite loops by tracking executed tasks."""
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
tasks = [
Task(description="Task 1", expected_output="Output 1", agent=agents[0]),
Task(description="Task 2", expected_output="Output 2", agent=agents[0]),
]
call_count = 0
def loop_callback(all_tasks, completed_outputs, current_index):
nonlocal call_count
call_count += 1
if call_count > 10:
pytest.fail("Callback called too many times, possible infinite loop")
return 0
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
task_ordering_callback=loop_callback,
verbose=False
)
result = crew.kickoff()
assert len(result.tasks_output) == 2
assert call_count <= 4