From ed95f47b80e44e2874f40645715e18861a647625 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 29 Sep 2025 11:08:32 +0000 Subject: [PATCH] fix: Resolve critical bugs identified by Cursor Bugbot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix task completion tracking to use task.output instead of non-existent task_id - Update callback validation to raise ValueError instead of PydanticCustomError - Refactor _execute_tasks to prevent task skipping and ensure all tasks execute exactly once - Maintain replay functionality compatibility with dynamic ordering - Remove undefined current_index variable reference Addresses all 3 bugs reported by automated analysis: 1. Task Skipping and Replay Breakage 2. Callback Validation Error Handling Mismatch 3. TaskOutput Missing task_id Causes Errors Co-Authored-By: João --- examples/dynamic_task_ordering_example.py | 7 +-- src/crewai/crew.py | 66 ++++++++++------------- tests/test_dynamic_task_ordering.py | 8 +-- 3 files changed, 36 insertions(+), 45 deletions(-) diff --git a/examples/dynamic_task_ordering_example.py b/examples/dynamic_task_ordering_example.py index 8abe4ce62..ebff99885 100644 --- a/examples/dynamic_task_ordering_example.py +++ b/examples/dynamic_task_ordering_example.py @@ -23,11 +23,11 @@ def priority_based_ordering(all_tasks, completed_outputs, current_index): Task: Task object to execute next None: Use default ordering """ - completed_task_ids = {output.task_id for output in completed_outputs} + completed_tasks = {id(task) for task in all_tasks if task.output is not None} remaining_tasks = [ (i, task) for i, task in enumerate(all_tasks) - if task.id not in completed_task_ids + if id(task) not in completed_tasks ] if not remaining_tasks: @@ -51,9 +51,10 @@ def conditional_ordering(all_tasks, completed_outputs, current_index): last_output = completed_outputs[-1] if "urgent" in last_output.raw.lower(): + completed_tasks = {id(task) for task in all_tasks if task.output is not None} for i, task in enumerate(all_tasks): if (hasattr(task, 'priority') and task.priority == 1 and - task.id not in {out.task_id for out in completed_outputs}): + id(task) not in completed_tasks): return i return None diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 53a1c5e69..64998329d 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -550,19 +550,13 @@ class Crew(FlowTrackable, BaseModel): """Validates that the task ordering callback has the correct signature.""" if self.task_ordering_callback is not None: if not callable(self.task_ordering_callback): - raise PydanticCustomError( - "invalid_task_ordering_callback", - "task_ordering_callback must be callable", - {}, - ) + raise ValueError("task_ordering_callback must be callable") try: sig = inspect.signature(self.task_ordering_callback) if len(sig.parameters) != 3: - raise PydanticCustomError( - "invalid_task_ordering_callback_signature", - "task_ordering_callback must accept exactly 3 parameters: (tasks, outputs, current_index)", - {}, + raise ValueError( + "task_ordering_callback must accept exactly 3 parameters: (tasks, outputs, current_index)" ) except (ValueError, TypeError): pass @@ -896,73 +890,72 @@ class Crew(FlowTrackable, BaseModel): futures: list[tuple[Task, Future[TaskOutput], int]] = [] last_sync_output: TaskOutput | None = None executed_task_indices: set[int] = set() - current_index = start_index or 0 - - while current_index < len(tasks): - if current_index in executed_task_indices: - current_index += 1 - continue - - if start_index is not None and current_index < start_index: - task = tasks[current_index] + + for task_index, task in enumerate(tasks): + if start_index is not None and task_index < start_index: if task.output: if task.async_execution: task_outputs.append(task.output) else: task_outputs = [task.output] last_sync_output = task.output - executed_task_indices.add(current_index) - current_index += 1 - continue + executed_task_indices.add(task_index) + while len(executed_task_indices) < len(tasks): + # Find next task to execute if self.task_ordering_callback: try: next_task_result = self.task_ordering_callback( - tasks, task_outputs, current_index + tasks, task_outputs, len(executed_task_indices) ) if next_task_result is None: - task_index = current_index + task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices) elif isinstance(next_task_result, int): - if 0 <= next_task_result < len(tasks): + if 0 <= next_task_result < len(tasks) and next_task_result not in executed_task_indices: task_index = next_task_result else: self._logger.log( "warning", - f"Invalid task index {next_task_result} from ordering callback, using default", + f"Invalid or already executed task index {next_task_result} from ordering callback, using default", color="yellow" ) - task_index = current_index + task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices) elif isinstance(next_task_result, Task): try: - task_index = tasks.index(next_task_result) + candidate_index = tasks.index(next_task_result) + if candidate_index not in executed_task_indices: + task_index = candidate_index + else: + self._logger.log( + "warning", + "Task from ordering callback already executed, using default", + color="yellow" + ) + task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices) except ValueError: self._logger.log( "warning", "Task from ordering callback not found in tasks list, using default", color="yellow" ) - task_index = current_index + task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices) else: self._logger.log( "warning", f"Invalid return type from ordering callback: {type(next_task_result)}, using default", color="yellow" ) - task_index = current_index + task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices) except Exception as e: self._logger.log( "warning", f"Error in task ordering callback: {e}, using default ordering", color="yellow" ) - task_index = current_index + task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices) else: - task_index = current_index - - if task_index in executed_task_indices: - current_index += 1 - continue + task_index = next(i for i in range(len(tasks)) if i not in executed_task_indices) task = tasks[task_index] executed_task_indices.add(task_index) @@ -990,7 +983,6 @@ class Crew(FlowTrackable, BaseModel): ) if skipped_task_output: task_outputs.append(skipped_task_output) - current_index += 1 continue if task.async_execution: @@ -1019,8 +1011,6 @@ class Crew(FlowTrackable, BaseModel): self._store_execution_log(task, task_output, task_index, was_replayed) last_sync_output = task_output - current_index += 1 - if futures: task_outputs = self._process_async_tasks(futures, was_replayed) diff --git a/tests/test_dynamic_task_ordering.py b/tests/test_dynamic_task_ordering.py index 291498e7a..941b840ef 100644 --- a/tests/test_dynamic_task_ordering.py +++ b/tests/test_dynamic_task_ordering.py @@ -29,9 +29,9 @@ def test_sequential_process_with_reverse_ordering(agents, tasks): execution_order = [] def reverse_ordering_callback(all_tasks, completed_outputs, current_index): - completed_task_ids = {output.task_id for output in completed_outputs} + completed_tasks = {id(task) for task in all_tasks if task.output is not None} remaining_indices = [i for i in range(len(all_tasks)) - if all_tasks[i].id not in completed_task_ids] + if id(all_tasks[i]) not in completed_tasks] if remaining_indices: next_index = max(remaining_indices) execution_order.append(next_index) @@ -62,10 +62,10 @@ def test_hierarchical_process_with_priority_ordering(agents, tasks): execution_order = [] def priority_ordering_callback(all_tasks, completed_outputs, current_index): - completed_task_ids = {output.task_id for output in completed_outputs} + completed_tasks = {id(task) for task in all_tasks if task.output is not None} remaining_tasks = [ (i, task) for i, task in enumerate(all_tasks) - if task.id not in completed_task_ids + if id(task) not in completed_tasks ] if remaining_tasks: