Compare commits

..

3 Commits

Author SHA1 Message Date
Devin AI
b585f205be fix: remove sorted() calls to avoid shared-task mutation issues
Reverts the defensive sorted() by _execution_index added in the previous
commit. Bugbot correctly identified two issues:
1. None fallback to 0 would move unstamped tasks to the front
2. Shared task objects across crews would get corrupted indices

The _execution_index field remains as an observable contract for tests
and documentation. List iteration order (Python list semantics) is the
actual runtime enforcement — which was already deterministic.

Co-Authored-By: João <joao@crewai.com>
2026-03-01 16:43:01 +00:00
Devin AI
9e18f4855a fix: address review comments - add runtime enforcement and remove unused code
- Remove unused 'import asyncio' from test file
- Remove unused 'original_execute_sync' variable from test
- Add defensive sorted() by _execution_index in _execute_tasks and _aexecute_tasks
  so the field is actively used at runtime, not just metadata

Co-Authored-By: João <joao@crewai.com>
2026-03-01 16:34:08 +00:00
Devin AI
62bdc0f3b2 fix: enforce deterministic ordering for equal-priority crew tasks
Stamp each task with a stable _execution_index at crew-construction time
to lock insertion order as the deterministic tie-break contract.

- Add _execution_index PrivateAttr to Task
- Add stamp_execution_order model_validator to Crew
- Update _execute_tasks and _aexecute_tasks docstrings with ordering contract
- Add comprehensive regression tests

Closes #4664

Co-Authored-By: João <joao@crewai.com>
2026-03-01 16:27:15 +00:00
3 changed files with 283 additions and 8 deletions

View File

@@ -438,6 +438,18 @@ class Crew(FlowTrackable, BaseModel):
agent.set_rpm_controller(self._rpm_controller)
return self
@model_validator(mode="after")
def stamp_execution_order(self) -> Self:
"""Lock task execution order by stamping each task with its insertion index.
This guarantees deterministic, stable ordering for all tasks regardless
of any other attribute. Tasks are always dispatched in the order they
appear in ``self.tasks`` (i.e. insertion order).
"""
for idx, task in enumerate(self.tasks):
task._execution_index = idx
return self
@model_validator(mode="after")
def validate_tasks(self) -> Self:
if self.process == Process.sequential:
@@ -984,13 +996,18 @@ class Crew(FlowTrackable, BaseModel):
) -> CrewOutput:
"""Executes tasks using native async and returns the final output.
**Ordering contract**: tasks are dispatched in the exact order they
appear in *tasks* (i.e. their insertion / list order). Each task
carries an ``_execution_index`` stamped at crew-construction time
that locks this order deterministically.
Args:
tasks: List of tasks to execute
start_index: Index to start execution from (for replay)
was_replayed: Whether this is a replayed execution
tasks: List of tasks to execute (preserves insertion order).
start_index: Index to start execution from (for replay).
was_replayed: Whether this is a replayed execution.
Returns:
CrewOutput: Final output of the crew
CrewOutput: Final output of the crew.
"""
task_outputs: list[TaskOutput] = []
pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]] = []
@@ -1183,13 +1200,18 @@ class Crew(FlowTrackable, BaseModel):
) -> CrewOutput:
"""Executes tasks sequentially and returns the final output.
**Ordering contract**: tasks are dispatched in the exact order they
appear in *tasks* (i.e. their insertion / list order). Each task
carries an ``_execution_index`` stamped at crew-construction time
that locks this order deterministically.
Args:
tasks (List[Task]): List of tasks to execute
manager (Optional[BaseAgent], optional): Manager agent to use for
delegation. Defaults to None.
tasks: List of tasks to execute (preserves insertion order).
start_index: Index to resume execution from (for replay).
was_replayed: Whether this is a replayed execution.
Returns:
CrewOutput: Final output of the crew
CrewOutput: Final output of the crew.
"""
custom_start = self._get_execution_start_index(tasks)
if custom_start is not None:

View File

@@ -224,6 +224,7 @@ class Task(BaseModel):
_guardrail_retry_counts: dict[int, int] = PrivateAttr(
default_factory=dict,
)
_execution_index: int | None = PrivateAttr(default=None)
_original_description: str | None = PrivateAttr(default=None)
_original_expected_output: str | None = PrivateAttr(default=None)
_original_output_file: str | None = PrivateAttr(default=None)

View File

@@ -0,0 +1,252 @@
"""Regression tests for deterministic task execution ordering.
These tests ensure that tasks with the same implicit priority (all tasks)
are always dispatched in their insertion order — the exact sequence the
user passed to ``Crew(tasks=[...])``.
See: https://github.com/crewAIInc/crewAI/issues/4664
"""
from __future__ import annotations
from unittest.mock import patch
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.process import Process
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
N_TASKS = 6
ITERATIONS = 5 # run multiple times to surface any non-determinism
def _make_agent(role: str = "Worker") -> Agent:
return Agent(
role=role,
goal="Complete tasks.",
backstory="You are a diligent worker.",
allow_delegation=False,
)
def _make_tasks(agent: Agent, n: int = N_TASKS) -> list[Task]:
return [
Task(
description=f"Task {i}",
expected_output=f"Output {i}",
agent=agent,
)
for i in range(n)
]
def _mock_task_output(desc: str = "mock") -> TaskOutput:
return TaskOutput(
description=desc,
raw="mocked output",
agent="mocked agent",
messages=[],
)
# ---------------------------------------------------------------------------
# Tests: _execution_index stamping
# ---------------------------------------------------------------------------
class TestExecutionIndexStamping:
"""Verify that Crew stamps each task with a stable ``_execution_index``."""
def test_tasks_receive_execution_index_on_construction(self):
agent = _make_agent()
tasks = _make_tasks(agent)
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
for idx, task in enumerate(crew.tasks):
assert task._execution_index == idx, (
f"Task '{task.description}' should have _execution_index={idx}, "
f"got {task._execution_index}"
)
def test_execution_index_matches_insertion_order(self):
"""Build the crew multiple times and verify indices are stable."""
agent = _make_agent()
for _ in range(ITERATIONS):
tasks = _make_tasks(agent)
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
indices = [t._execution_index for t in crew.tasks]
assert indices == list(range(N_TASKS))
def test_single_task_gets_index_zero(self):
agent = _make_agent()
task = Task(
description="Only task",
expected_output="Only output",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], process=Process.sequential)
assert crew.tasks[0]._execution_index == 0
def test_execution_index_preserved_after_copy(self):
agent = _make_agent()
tasks = _make_tasks(agent)
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
copied = crew.copy()
for idx, task in enumerate(copied.tasks):
assert task._execution_index == idx
# ---------------------------------------------------------------------------
# Tests: deterministic dispatch order (sync)
# ---------------------------------------------------------------------------
class TestDeterministicSyncOrder:
"""Verify that ``_execute_tasks`` dispatches tasks in insertion order."""
def test_sequential_dispatch_order_is_stable(self):
"""Run the crew multiple times and record the order tasks are dispatched."""
agent = _make_agent()
tasks = _make_tasks(agent)
mock_output = _mock_task_output()
for t in tasks:
t.output = mock_output
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
for _ in range(ITERATIONS):
dispatch_order: list[str] = []
def tracking_execute_sync(self_task, *args, **kwargs):
dispatch_order.append(self_task.description)
return mock_output
with patch.object(Task, "execute_sync", tracking_execute_sync):
crew.kickoff()
expected = [f"Task {i}" for i in range(N_TASKS)]
assert dispatch_order == expected, (
f"Expected dispatch order {expected}, got {dispatch_order}"
)
def test_many_same_description_tasks_preserve_order(self):
"""Tasks with identical descriptions must still keep insertion order."""
agent = _make_agent()
tasks = [
Task(
description="Identical task",
expected_output=f"Output {i}",
agent=agent,
)
for i in range(N_TASKS)
]
mock_output = _mock_task_output()
for t in tasks:
t.output = mock_output
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
dispatch_indices: list[int | None] = []
def tracking_execute_sync(self_task, *args, **kwargs):
dispatch_indices.append(self_task._execution_index)
return mock_output
with patch.object(Task, "execute_sync", tracking_execute_sync):
crew.kickoff()
assert dispatch_indices == list(range(N_TASKS))
# ---------------------------------------------------------------------------
# Tests: deterministic dispatch order (async)
# ---------------------------------------------------------------------------
class TestDeterministicAsyncOrder:
"""Verify that ``_aexecute_tasks`` dispatches tasks in insertion order."""
@pytest.mark.asyncio
async def test_async_dispatch_order_is_stable(self):
"""Run the async crew multiple times and verify dispatch order."""
agent = _make_agent()
tasks = _make_tasks(agent)
mock_output = _mock_task_output()
for t in tasks:
t.output = mock_output
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
for _ in range(ITERATIONS):
dispatch_order: list[str] = []
async def tracking_aexecute_sync(self_task, *args, **kwargs):
dispatch_order.append(self_task.description)
return mock_output
with patch.object(Task, "aexecute_sync", tracking_aexecute_sync):
await crew.akickoff()
expected = [f"Task {i}" for i in range(N_TASKS)]
assert dispatch_order == expected, (
f"Expected dispatch order {expected}, got {dispatch_order}"
)
# ---------------------------------------------------------------------------
# Tests: task_outputs ordering matches task insertion order
# ---------------------------------------------------------------------------
class TestOutputOrdering:
"""Verify that ``CrewOutput.tasks_output`` preserves insertion order."""
def test_tasks_output_order_matches_insertion_order(self):
agent = _make_agent()
tasks = _make_tasks(agent)
outputs = [
TaskOutput(
description=f"Task {i}",
raw=f"result {i}",
agent="Worker",
messages=[],
)
for i in range(N_TASKS)
]
call_index = {"idx": 0}
def ordered_execute_sync(self_task, *args, **kwargs):
idx = call_index["idx"]
call_index["idx"] += 1
self_task.output = outputs[idx]
return outputs[idx]
for t in tasks:
t.output = outputs[0] # ensure output is set for validation
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
with patch.object(Task, "execute_sync", ordered_execute_sync):
result = crew.kickoff()
for i, task_output in enumerate(result.tasks_output):
assert task_output.raw == f"result {i}", (
f"tasks_output[{i}] should have raw='result {i}', "
f"got '{task_output.raw}'"
)