mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 23:58:34 +00:00
- Fix task completion tracking to use task.output instead of non-existent task_id - Update callback validation to raise ValueError instead of PydanticCustomError - Refactor _execute_tasks to prevent task skipping and ensure all tasks execute exactly once - Maintain replay functionality compatibility with dynamic ordering - Remove undefined current_index variable reference Addresses all 3 bugs reported by automated analysis: 1. Task Skipping and Replay Breakage 2. Callback Validation Error Handling Mismatch 3. TaskOutput Missing task_id Causes Errors Co-Authored-By: João <joao@crewai.com>
303 lines
9.2 KiB
Python
303 lines
9.2 KiB
Python
import pytest
|
|
from unittest.mock import Mock
|
|
|
|
from crewai import Agent, Crew, Task
|
|
from crewai.process import Process
|
|
from crewai.task import TaskOutput
|
|
|
|
|
|
@pytest.fixture
|
|
def agents():
|
|
return [
|
|
Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1"),
|
|
Agent(role="Agent 2", goal="Goal 2", backstory="Backstory 2"),
|
|
Agent(role="Agent 3", goal="Goal 3", backstory="Backstory 3"),
|
|
]
|
|
|
|
|
|
@pytest.fixture
|
|
def tasks(agents):
|
|
return [
|
|
Task(description="Task 1", expected_output="Output 1", agent=agents[0]),
|
|
Task(description="Task 2", expected_output="Output 2", agent=agents[1]),
|
|
Task(description="Task 3", expected_output="Output 3", agent=agents[2]),
|
|
]
|
|
|
|
|
|
def test_sequential_process_with_reverse_ordering(agents, tasks):
|
|
"""Test sequential process with reverse task ordering."""
|
|
execution_order = []
|
|
|
|
def reverse_ordering_callback(all_tasks, completed_outputs, current_index):
|
|
completed_tasks = {id(task) for task in all_tasks if task.output is not None}
|
|
remaining_indices = [i for i in range(len(all_tasks))
|
|
if id(all_tasks[i]) not in completed_tasks]
|
|
if remaining_indices:
|
|
next_index = max(remaining_indices)
|
|
execution_order.append(next_index)
|
|
return next_index
|
|
return None
|
|
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=tasks,
|
|
process=Process.sequential,
|
|
task_ordering_callback=reverse_ordering_callback,
|
|
verbose=False
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
|
|
assert len(result.tasks_output) == 3
|
|
assert execution_order == [2, 1, 0]
|
|
|
|
|
|
def test_hierarchical_process_with_priority_ordering(agents, tasks):
|
|
"""Test hierarchical process with priority-based task ordering."""
|
|
|
|
tasks[0].priority = 3
|
|
tasks[1].priority = 1
|
|
tasks[2].priority = 2
|
|
|
|
execution_order = []
|
|
|
|
def priority_ordering_callback(all_tasks, completed_outputs, current_index):
|
|
completed_tasks = {id(task) for task in all_tasks if task.output is not None}
|
|
remaining_tasks = [
|
|
(i, task) for i, task in enumerate(all_tasks)
|
|
if id(task) not in completed_tasks
|
|
]
|
|
|
|
if remaining_tasks:
|
|
remaining_tasks.sort(key=lambda x: getattr(x[1], 'priority', 999))
|
|
next_index = remaining_tasks[0][0]
|
|
execution_order.append(next_index)
|
|
return next_index
|
|
|
|
return None
|
|
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=tasks,
|
|
process=Process.hierarchical,
|
|
manager_llm="gpt-4o",
|
|
task_ordering_callback=priority_ordering_callback,
|
|
verbose=False
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
assert len(result.tasks_output) == 3
|
|
assert execution_order == [1, 2, 0]
|
|
|
|
|
|
def test_task_ordering_callback_with_task_object_return():
|
|
"""Test callback returning Task object instead of index."""
|
|
|
|
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
|
|
tasks = [
|
|
Task(description="Task A", expected_output="Output A", agent=agents[0]),
|
|
Task(description="Task B", expected_output="Output B", agent=agents[0]),
|
|
]
|
|
|
|
execution_order = []
|
|
|
|
def task_object_callback(all_tasks, completed_outputs, current_index):
|
|
if len(completed_outputs) == 0:
|
|
execution_order.append(1)
|
|
return all_tasks[1]
|
|
elif len(completed_outputs) == 1:
|
|
execution_order.append(0)
|
|
return all_tasks[0]
|
|
return None
|
|
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=tasks,
|
|
process=Process.sequential,
|
|
task_ordering_callback=task_object_callback,
|
|
verbose=False
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
assert len(result.tasks_output) == 2
|
|
assert execution_order == [1, 0]
|
|
|
|
|
|
def test_invalid_task_ordering_callback_index():
|
|
"""Test handling of invalid task index from callback."""
|
|
|
|
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
|
|
tasks = [Task(description="Task", expected_output="Output", agent=agents[0])]
|
|
|
|
def invalid_callback(all_tasks, completed_outputs, current_index):
|
|
return 999
|
|
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=tasks,
|
|
process=Process.sequential,
|
|
task_ordering_callback=invalid_callback,
|
|
verbose=False
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
assert len(result.tasks_output) == 1
|
|
|
|
|
|
def test_task_ordering_callback_exception_handling():
|
|
"""Test handling of exceptions in task ordering callback."""
|
|
|
|
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
|
|
tasks = [Task(description="Task", expected_output="Output", agent=agents[0])]
|
|
|
|
def failing_callback(all_tasks, completed_outputs, current_index):
|
|
raise ValueError("Callback error")
|
|
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=tasks,
|
|
process=Process.sequential,
|
|
task_ordering_callback=failing_callback,
|
|
verbose=False
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
assert len(result.tasks_output) == 1
|
|
|
|
|
|
def test_task_ordering_callback_validation():
|
|
"""Test validation of task ordering callback signature."""
|
|
|
|
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
|
|
tasks = [Task(description="Task", expected_output="Output", agent=agents[0])]
|
|
|
|
def invalid_signature_callback(only_one_param):
|
|
return 0
|
|
|
|
with pytest.raises(ValueError, match="task_ordering_callback must accept exactly 3 parameters"):
|
|
Crew(
|
|
agents=agents,
|
|
tasks=tasks,
|
|
process=Process.sequential,
|
|
task_ordering_callback=invalid_signature_callback
|
|
)
|
|
|
|
|
|
def test_no_task_ordering_callback_default_behavior():
|
|
"""Test that default behavior is unchanged when no callback is provided."""
|
|
|
|
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
|
|
tasks = [
|
|
Task(description="Task 1", expected_output="Output 1", agent=agents[0]),
|
|
Task(description="Task 2", expected_output="Output 2", agent=agents[0]),
|
|
]
|
|
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=tasks,
|
|
process=Process.sequential,
|
|
verbose=False
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
assert len(result.tasks_output) == 2
|
|
|
|
|
|
def test_task_ordering_callback_with_none_return():
|
|
"""Test callback returning None for default ordering."""
|
|
|
|
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
|
|
tasks = [
|
|
Task(description="Task 1", expected_output="Output 1", agent=agents[0]),
|
|
Task(description="Task 2", expected_output="Output 2", agent=agents[0]),
|
|
]
|
|
|
|
def none_callback(all_tasks, completed_outputs, current_index):
|
|
return None
|
|
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=tasks,
|
|
process=Process.sequential,
|
|
task_ordering_callback=none_callback,
|
|
verbose=False
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
assert len(result.tasks_output) == 2
|
|
|
|
|
|
def test_task_ordering_callback_invalid_task_object():
|
|
"""Test handling of invalid Task object from callback."""
|
|
|
|
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
|
|
tasks = [Task(description="Task", expected_output="Output", agent=agents[0])]
|
|
|
|
invalid_task = Task(description="Invalid", expected_output="Invalid", agent=agents[0])
|
|
|
|
def invalid_task_callback(all_tasks, completed_outputs, current_index):
|
|
return invalid_task
|
|
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=tasks,
|
|
process=Process.sequential,
|
|
task_ordering_callback=invalid_task_callback,
|
|
verbose=False
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
assert len(result.tasks_output) == 1
|
|
|
|
|
|
def test_task_ordering_callback_invalid_return_type():
|
|
"""Test handling of invalid return type from callback."""
|
|
|
|
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
|
|
tasks = [Task(description="Task", expected_output="Output", agent=agents[0])]
|
|
|
|
def invalid_type_callback(all_tasks, completed_outputs, current_index):
|
|
return "invalid"
|
|
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=tasks,
|
|
process=Process.sequential,
|
|
task_ordering_callback=invalid_type_callback,
|
|
verbose=False
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
assert len(result.tasks_output) == 1
|
|
|
|
|
|
def test_task_ordering_prevents_infinite_loops():
|
|
"""Test that task ordering prevents infinite loops by tracking executed tasks."""
|
|
|
|
agents = [Agent(role="Agent", goal="Goal", backstory="Backstory")]
|
|
tasks = [
|
|
Task(description="Task 1", expected_output="Output 1", agent=agents[0]),
|
|
Task(description="Task 2", expected_output="Output 2", agent=agents[0]),
|
|
]
|
|
|
|
call_count = 0
|
|
|
|
def loop_callback(all_tasks, completed_outputs, current_index):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count > 10:
|
|
pytest.fail("Callback called too many times, possible infinite loop")
|
|
return 0
|
|
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=tasks,
|
|
process=Process.sequential,
|
|
task_ordering_callback=loop_callback,
|
|
verbose=False
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
assert len(result.tasks_output) == 2
|
|
assert call_count <= 4
|