mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-03-02 01:38:15 +00:00
Compare commits
3 Commits
main
...
devin/1772
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b585f205be | ||
|
|
9e18f4855a | ||
|
|
62bdc0f3b2 |
@@ -438,6 +438,18 @@ class Crew(FlowTrackable, BaseModel):
|
||||
agent.set_rpm_controller(self._rpm_controller)
|
||||
return self
|
||||
|
||||
@model_validator(mode="after")
|
||||
def stamp_execution_order(self) -> Self:
|
||||
"""Lock task execution order by stamping each task with its insertion index.
|
||||
|
||||
This guarantees deterministic, stable ordering for all tasks regardless
|
||||
of any other attribute. Tasks are always dispatched in the order they
|
||||
appear in ``self.tasks`` (i.e. insertion order).
|
||||
"""
|
||||
for idx, task in enumerate(self.tasks):
|
||||
task._execution_index = idx
|
||||
return self
|
||||
|
||||
@model_validator(mode="after")
|
||||
def validate_tasks(self) -> Self:
|
||||
if self.process == Process.sequential:
|
||||
@@ -984,13 +996,18 @@ class Crew(FlowTrackable, BaseModel):
|
||||
) -> CrewOutput:
|
||||
"""Executes tasks using native async and returns the final output.
|
||||
|
||||
**Ordering contract**: tasks are dispatched in the exact order they
|
||||
appear in *tasks* (i.e. their insertion / list order). Each task
|
||||
carries an ``_execution_index`` stamped at crew-construction time
|
||||
that locks this order deterministically.
|
||||
|
||||
Args:
|
||||
tasks: List of tasks to execute
|
||||
start_index: Index to start execution from (for replay)
|
||||
was_replayed: Whether this is a replayed execution
|
||||
tasks: List of tasks to execute (preserves insertion order).
|
||||
start_index: Index to start execution from (for replay).
|
||||
was_replayed: Whether this is a replayed execution.
|
||||
|
||||
Returns:
|
||||
CrewOutput: Final output of the crew
|
||||
CrewOutput: Final output of the crew.
|
||||
"""
|
||||
task_outputs: list[TaskOutput] = []
|
||||
pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]] = []
|
||||
@@ -1183,13 +1200,18 @@ class Crew(FlowTrackable, BaseModel):
|
||||
) -> CrewOutput:
|
||||
"""Executes tasks sequentially and returns the final output.
|
||||
|
||||
**Ordering contract**: tasks are dispatched in the exact order they
|
||||
appear in *tasks* (i.e. their insertion / list order). Each task
|
||||
carries an ``_execution_index`` stamped at crew-construction time
|
||||
that locks this order deterministically.
|
||||
|
||||
Args:
|
||||
tasks (List[Task]): List of tasks to execute
|
||||
manager (Optional[BaseAgent], optional): Manager agent to use for
|
||||
delegation. Defaults to None.
|
||||
tasks: List of tasks to execute (preserves insertion order).
|
||||
start_index: Index to resume execution from (for replay).
|
||||
was_replayed: Whether this is a replayed execution.
|
||||
|
||||
Returns:
|
||||
CrewOutput: Final output of the crew
|
||||
CrewOutput: Final output of the crew.
|
||||
"""
|
||||
custom_start = self._get_execution_start_index(tasks)
|
||||
if custom_start is not None:
|
||||
|
||||
@@ -224,6 +224,7 @@ class Task(BaseModel):
|
||||
_guardrail_retry_counts: dict[int, int] = PrivateAttr(
|
||||
default_factory=dict,
|
||||
)
|
||||
_execution_index: int | None = PrivateAttr(default=None)
|
||||
_original_description: str | None = PrivateAttr(default=None)
|
||||
_original_expected_output: str | None = PrivateAttr(default=None)
|
||||
_original_output_file: str | None = PrivateAttr(default=None)
|
||||
|
||||
252
lib/crewai/tests/crew/test_deterministic_task_ordering.py
Normal file
252
lib/crewai/tests/crew/test_deterministic_task_ordering.py
Normal file
@@ -0,0 +1,252 @@
|
||||
"""Regression tests for deterministic task execution ordering.
|
||||
|
||||
These tests ensure that tasks with the same implicit priority (all tasks)
|
||||
are always dispatched in their insertion order — the exact sequence the
|
||||
user passed to ``Crew(tasks=[...])``.
|
||||
|
||||
See: https://github.com/crewAIInc/crewAI/issues/4664
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.crew import Crew
|
||||
from crewai.process import Process
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
N_TASKS = 6
|
||||
ITERATIONS = 5 # run multiple times to surface any non-determinism
|
||||
|
||||
|
||||
def _make_agent(role: str = "Worker") -> Agent:
|
||||
return Agent(
|
||||
role=role,
|
||||
goal="Complete tasks.",
|
||||
backstory="You are a diligent worker.",
|
||||
allow_delegation=False,
|
||||
)
|
||||
|
||||
|
||||
def _make_tasks(agent: Agent, n: int = N_TASKS) -> list[Task]:
|
||||
return [
|
||||
Task(
|
||||
description=f"Task {i}",
|
||||
expected_output=f"Output {i}",
|
||||
agent=agent,
|
||||
)
|
||||
for i in range(n)
|
||||
]
|
||||
|
||||
|
||||
def _mock_task_output(desc: str = "mock") -> TaskOutput:
|
||||
return TaskOutput(
|
||||
description=desc,
|
||||
raw="mocked output",
|
||||
agent="mocked agent",
|
||||
messages=[],
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: _execution_index stamping
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExecutionIndexStamping:
|
||||
"""Verify that Crew stamps each task with a stable ``_execution_index``."""
|
||||
|
||||
def test_tasks_receive_execution_index_on_construction(self):
|
||||
agent = _make_agent()
|
||||
tasks = _make_tasks(agent)
|
||||
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
|
||||
|
||||
for idx, task in enumerate(crew.tasks):
|
||||
assert task._execution_index == idx, (
|
||||
f"Task '{task.description}' should have _execution_index={idx}, "
|
||||
f"got {task._execution_index}"
|
||||
)
|
||||
|
||||
def test_execution_index_matches_insertion_order(self):
|
||||
"""Build the crew multiple times and verify indices are stable."""
|
||||
agent = _make_agent()
|
||||
|
||||
for _ in range(ITERATIONS):
|
||||
tasks = _make_tasks(agent)
|
||||
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
|
||||
|
||||
indices = [t._execution_index for t in crew.tasks]
|
||||
assert indices == list(range(N_TASKS))
|
||||
|
||||
def test_single_task_gets_index_zero(self):
|
||||
agent = _make_agent()
|
||||
task = Task(
|
||||
description="Only task",
|
||||
expected_output="Only output",
|
||||
agent=agent,
|
||||
)
|
||||
crew = Crew(agents=[agent], tasks=[task], process=Process.sequential)
|
||||
assert crew.tasks[0]._execution_index == 0
|
||||
|
||||
def test_execution_index_preserved_after_copy(self):
|
||||
agent = _make_agent()
|
||||
tasks = _make_tasks(agent)
|
||||
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
|
||||
|
||||
copied = crew.copy()
|
||||
for idx, task in enumerate(copied.tasks):
|
||||
assert task._execution_index == idx
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: deterministic dispatch order (sync)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDeterministicSyncOrder:
|
||||
"""Verify that ``_execute_tasks`` dispatches tasks in insertion order."""
|
||||
|
||||
def test_sequential_dispatch_order_is_stable(self):
|
||||
"""Run the crew multiple times and record the order tasks are dispatched."""
|
||||
agent = _make_agent()
|
||||
tasks = _make_tasks(agent)
|
||||
|
||||
mock_output = _mock_task_output()
|
||||
for t in tasks:
|
||||
t.output = mock_output
|
||||
|
||||
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
|
||||
|
||||
for _ in range(ITERATIONS):
|
||||
dispatch_order: list[str] = []
|
||||
|
||||
def tracking_execute_sync(self_task, *args, **kwargs):
|
||||
dispatch_order.append(self_task.description)
|
||||
return mock_output
|
||||
|
||||
with patch.object(Task, "execute_sync", tracking_execute_sync):
|
||||
crew.kickoff()
|
||||
|
||||
expected = [f"Task {i}" for i in range(N_TASKS)]
|
||||
assert dispatch_order == expected, (
|
||||
f"Expected dispatch order {expected}, got {dispatch_order}"
|
||||
)
|
||||
|
||||
def test_many_same_description_tasks_preserve_order(self):
|
||||
"""Tasks with identical descriptions must still keep insertion order."""
|
||||
agent = _make_agent()
|
||||
tasks = [
|
||||
Task(
|
||||
description="Identical task",
|
||||
expected_output=f"Output {i}",
|
||||
agent=agent,
|
||||
)
|
||||
for i in range(N_TASKS)
|
||||
]
|
||||
|
||||
mock_output = _mock_task_output()
|
||||
for t in tasks:
|
||||
t.output = mock_output
|
||||
|
||||
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
|
||||
|
||||
dispatch_indices: list[int | None] = []
|
||||
|
||||
def tracking_execute_sync(self_task, *args, **kwargs):
|
||||
dispatch_indices.append(self_task._execution_index)
|
||||
return mock_output
|
||||
|
||||
with patch.object(Task, "execute_sync", tracking_execute_sync):
|
||||
crew.kickoff()
|
||||
|
||||
assert dispatch_indices == list(range(N_TASKS))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: deterministic dispatch order (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDeterministicAsyncOrder:
|
||||
"""Verify that ``_aexecute_tasks`` dispatches tasks in insertion order."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_dispatch_order_is_stable(self):
|
||||
"""Run the async crew multiple times and verify dispatch order."""
|
||||
agent = _make_agent()
|
||||
tasks = _make_tasks(agent)
|
||||
|
||||
mock_output = _mock_task_output()
|
||||
for t in tasks:
|
||||
t.output = mock_output
|
||||
|
||||
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
|
||||
|
||||
for _ in range(ITERATIONS):
|
||||
dispatch_order: list[str] = []
|
||||
|
||||
async def tracking_aexecute_sync(self_task, *args, **kwargs):
|
||||
dispatch_order.append(self_task.description)
|
||||
return mock_output
|
||||
|
||||
with patch.object(Task, "aexecute_sync", tracking_aexecute_sync):
|
||||
await crew.akickoff()
|
||||
|
||||
expected = [f"Task {i}" for i in range(N_TASKS)]
|
||||
assert dispatch_order == expected, (
|
||||
f"Expected dispatch order {expected}, got {dispatch_order}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: task_outputs ordering matches task insertion order
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestOutputOrdering:
|
||||
"""Verify that ``CrewOutput.tasks_output`` preserves insertion order."""
|
||||
|
||||
def test_tasks_output_order_matches_insertion_order(self):
|
||||
agent = _make_agent()
|
||||
tasks = _make_tasks(agent)
|
||||
|
||||
outputs = [
|
||||
TaskOutput(
|
||||
description=f"Task {i}",
|
||||
raw=f"result {i}",
|
||||
agent="Worker",
|
||||
messages=[],
|
||||
)
|
||||
for i in range(N_TASKS)
|
||||
]
|
||||
|
||||
call_index = {"idx": 0}
|
||||
|
||||
def ordered_execute_sync(self_task, *args, **kwargs):
|
||||
idx = call_index["idx"]
|
||||
call_index["idx"] += 1
|
||||
self_task.output = outputs[idx]
|
||||
return outputs[idx]
|
||||
|
||||
for t in tasks:
|
||||
t.output = outputs[0] # ensure output is set for validation
|
||||
|
||||
crew = Crew(agents=[agent], tasks=tasks, process=Process.sequential)
|
||||
|
||||
with patch.object(Task, "execute_sync", ordered_execute_sync):
|
||||
result = crew.kickoff()
|
||||
|
||||
for i, task_output in enumerate(result.tasks_output):
|
||||
assert task_output.raw == f"result {i}", (
|
||||
f"tasks_output[{i}] should have raw='result {i}', "
|
||||
f"got '{task_output.raw}'"
|
||||
)
|
||||
Reference in New Issue
Block a user