fix: Resolve critical bugs identified by Cursor Bugbot

- Fix task completion tracking to use task.output instead of non-existent task_id
- Update callback validation to raise ValueError instead of PydanticCustomError
- Refactor _execute_tasks to prevent task skipping and ensure all tasks execute exactly once
- Maintain replay functionality compatibility with dynamic ordering
- Remove undefined current_index variable reference

Addresses all 3 bugs reported by automated analysis:
1. Task Skipping and Replay Breakage
2. Callback Validation Error Handling Mismatch
3. TaskOutput Missing task_id Causes Errors

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-09-29 11:08:32 +00:00
parent c467c96e9f
commit ed95f47b80
3 changed files with 36 additions and 45 deletions

View File

@@ -23,11 +23,11 @@ def priority_based_ordering(all_tasks, completed_outputs, current_index):
Task: Task object to execute next Task: Task object to execute next
None: Use default ordering None: Use default ordering
""" """
completed_task_ids = {output.task_id for output in completed_outputs} completed_tasks = {id(task) for task in all_tasks if task.output is not None}
remaining_tasks = [ remaining_tasks = [
(i, task) for i, task in enumerate(all_tasks) (i, task) for i, task in enumerate(all_tasks)
if task.id not in completed_task_ids if id(task) not in completed_tasks
] ]
if not remaining_tasks: if not remaining_tasks:
@@ -51,9 +51,10 @@ def conditional_ordering(all_tasks, completed_outputs, current_index):
last_output = completed_outputs[-1] last_output = completed_outputs[-1]
if "urgent" in last_output.raw.lower(): if "urgent" in last_output.raw.lower():
completed_tasks = {id(task) for task in all_tasks if task.output is not None}
for i, task in enumerate(all_tasks): for i, task in enumerate(all_tasks):
if (hasattr(task, 'priority') and task.priority == 1 and if (hasattr(task, 'priority') and task.priority == 1 and
task.id not in {out.task_id for out in completed_outputs}): id(task) not in completed_tasks):
return i return i
return None return None

View File

@@ -550,19 +550,13 @@ class Crew(FlowTrackable, BaseModel):
"""Validates that the task ordering callback has the correct signature.""" """Validates that the task ordering callback has the correct signature."""
if self.task_ordering_callback is not None: if self.task_ordering_callback is not None:
if not callable(self.task_ordering_callback): if not callable(self.task_ordering_callback):
raise PydanticCustomError( raise ValueError("task_ordering_callback must be callable")
"invalid_task_ordering_callback",
"task_ordering_callback must be callable",
{},
)
try: try:
sig = inspect.signature(self.task_ordering_callback) sig = inspect.signature(self.task_ordering_callback)
if len(sig.parameters) != 3: if len(sig.parameters) != 3:
raise PydanticCustomError( raise ValueError(
"invalid_task_ordering_callback_signature", "task_ordering_callback must accept exactly 3 parameters: (tasks, outputs, current_index)"
"task_ordering_callback must accept exactly 3 parameters: (tasks, outputs, current_index)",
{},
) )
except (ValueError, TypeError): except (ValueError, TypeError):
pass pass
@@ -896,73 +890,72 @@ class Crew(FlowTrackable, BaseModel):
futures: list[tuple[Task, Future[TaskOutput], int]] = [] futures: list[tuple[Task, Future[TaskOutput], int]] = []
last_sync_output: TaskOutput | None = None last_sync_output: TaskOutput | None = None
executed_task_indices: set[int] = set() executed_task_indices: set[int] = set()
current_index = start_index or 0
for task_index, task in enumerate(tasks):
while current_index < len(tasks): if start_index is not None and task_index < start_index:
if current_index in executed_task_indices:
current_index += 1
continue
if start_index is not None and current_index < start_index:
task = tasks[current_index]
if task.output: if task.output:
if task.async_execution: if task.async_execution:
task_outputs.append(task.output) task_outputs.append(task.output)
else: else:
task_outputs = [task.output] task_outputs = [task.output]
last_sync_output = task.output last_sync_output = task.output
executed_task_indices.add(current_index) executed_task_indices.add(task_index)
current_index += 1
continue
while len(executed_task_indices) < len(tasks):
# Find next task to execute
if self.task_ordering_callback: if self.task_ordering_callback:
try: try:
next_task_result = self.task_ordering_callback( next_task_result = self.task_ordering_callback(
tasks, task_outputs, current_index tasks, task_outputs, len(executed_task_indices)
) )
if next_task_result is None: if next_task_result is None:
task_index = current_index task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices)
elif isinstance(next_task_result, int): elif isinstance(next_task_result, int):
if 0 <= next_task_result < len(tasks): if 0 <= next_task_result < len(tasks) and next_task_result not in executed_task_indices:
task_index = next_task_result task_index = next_task_result
else: else:
self._logger.log( self._logger.log(
"warning", "warning",
f"Invalid task index {next_task_result} from ordering callback, using default", f"Invalid or already executed task index {next_task_result} from ordering callback, using default",
color="yellow" color="yellow"
) )
task_index = current_index task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices)
elif isinstance(next_task_result, Task): elif isinstance(next_task_result, Task):
try: try:
task_index = tasks.index(next_task_result) candidate_index = tasks.index(next_task_result)
if candidate_index not in executed_task_indices:
task_index = candidate_index
else:
self._logger.log(
"warning",
"Task from ordering callback already executed, using default",
color="yellow"
)
task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices)
except ValueError: except ValueError:
self._logger.log( self._logger.log(
"warning", "warning",
"Task from ordering callback not found in tasks list, using default", "Task from ordering callback not found in tasks list, using default",
color="yellow" color="yellow"
) )
task_index = current_index task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices)
else: else:
self._logger.log( self._logger.log(
"warning", "warning",
f"Invalid return type from ordering callback: {type(next_task_result)}, using default", f"Invalid return type from ordering callback: {type(next_task_result)}, using default",
color="yellow" color="yellow"
) )
task_index = current_index task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices)
except Exception as e: except Exception as e:
self._logger.log( self._logger.log(
"warning", "warning",
f"Error in task ordering callback: {e}, using default ordering", f"Error in task ordering callback: {e}, using default ordering",
color="yellow" color="yellow"
) )
task_index = current_index task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices)
else: else:
task_index = current_index task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices)
if task_index in executed_task_indices:
current_index += 1
continue
task = tasks[task_index] task = tasks[task_index]
executed_task_indices.add(task_index) executed_task_indices.add(task_index)
@@ -990,7 +983,6 @@ class Crew(FlowTrackable, BaseModel):
) )
if skipped_task_output: if skipped_task_output:
task_outputs.append(skipped_task_output) task_outputs.append(skipped_task_output)
current_index += 1
continue continue
if task.async_execution: if task.async_execution:
@@ -1019,8 +1011,6 @@ class Crew(FlowTrackable, BaseModel):
self._store_execution_log(task, task_output, task_index, was_replayed) self._store_execution_log(task, task_output, task_index, was_replayed)
last_sync_output = task_output last_sync_output = task_output
current_index += 1
if futures: if futures:
task_outputs = self._process_async_tasks(futures, was_replayed) task_outputs = self._process_async_tasks(futures, was_replayed)

View File

@@ -29,9 +29,9 @@ def test_sequential_process_with_reverse_ordering(agents, tasks):
execution_order = [] execution_order = []
def reverse_ordering_callback(all_tasks, completed_outputs, current_index): def reverse_ordering_callback(all_tasks, completed_outputs, current_index):
completed_task_ids = {output.task_id for output in completed_outputs} completed_tasks = {id(task) for task in all_tasks if task.output is not None}
remaining_indices = [i for i in range(len(all_tasks)) remaining_indices = [i for i in range(len(all_tasks))
if all_tasks[i].id not in completed_task_ids] if id(all_tasks[i]) not in completed_tasks]
if remaining_indices: if remaining_indices:
next_index = max(remaining_indices) next_index = max(remaining_indices)
execution_order.append(next_index) execution_order.append(next_index)
@@ -62,10 +62,10 @@ def test_hierarchical_process_with_priority_ordering(agents, tasks):
execution_order = [] execution_order = []
def priority_ordering_callback(all_tasks, completed_outputs, current_index): def priority_ordering_callback(all_tasks, completed_outputs, current_index):
completed_task_ids = {output.task_id for output in completed_outputs} completed_tasks = {id(task) for task in all_tasks if task.output is not None}
remaining_tasks = [ remaining_tasks = [
(i, task) for i, task in enumerate(all_tasks) (i, task) for i, task in enumerate(all_tasks)
if task.id not in completed_task_ids if id(task) not in completed_tasks
] ]
if remaining_tasks: if remaining_tasks: