Compare commits

...

2 Commits

Author SHA1 Message Date
Devin AI
da1fd5b953 fix: address ruff lint and formatting issues
- Add explicit strict=False to zip() call in _create_completed_tasks_summary
- Apply ruff formatting to all changed files

Co-Authored-By: João <joao@crewai.com>
2026-03-20 12:07:55 +00:00
Devin AI
3a2e5a3abb feat: add adaptive re-planning for crew task execution (#4983)
Add optional replan_on_failure and max_replans flags to the Crew class
that enable the system to re-evaluate and update the execution plan
when task results deviate from the original plan's assumptions.

New components:
- ReplanningEvaluator: lightweight LLM-based evaluator that checks
  whether a task result deviates significantly from the plan
- CrewPlanner._handle_crew_replanning(): generates revised plans for
  remaining tasks based on actual results so far
- Crew._maybe_replan(): hook called after each sync task completion
  in both _execute_tasks() and _aexecute_tasks()

Backwards compatible: replan_on_failure defaults to False, so existing
crews are completely unaffected.

Co-Authored-By: João <joao@crewai.com>
2026-03-20 12:03:57 +00:00
4 changed files with 960 additions and 0 deletions

View File

@@ -112,6 +112,7 @@ from crewai.utilities.llm_utils import create_llm
from crewai.utilities.logger import Logger
from crewai.utilities.planning_handler import CrewPlanner
from crewai.utilities.printer import PrinterColor
from crewai.utilities.replanning_evaluator import ReplanningEvaluator
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.streaming import (
create_async_chunk_generator,
@@ -182,6 +183,8 @@ class Crew(FlowTrackable, BaseModel):
default_factory=TaskOutputStorageHandler
)
_kickoff_event_id: str | None = PrivateAttr(default=None)
_replan_count: int = PrivateAttr(default=0)
_original_task_descriptions: list[str] = PrivateAttr(default_factory=list)
name: str | None = Field(default="crew")
cache: bool = Field(default=True)
@@ -269,6 +272,21 @@ class Crew(FlowTrackable, BaseModel):
"Language model that will run the AgentPlanner if planning is True."
),
)
replan_on_failure: bool = Field(
default=False,
description=(
"When True and planning is enabled, evaluate each task result against "
"the plan and trigger replanning if results deviate significantly."
),
)
max_replans: int = Field(
default=3,
description=(
"Maximum number of replans allowed during a single crew execution. "
"Prevents infinite replanning loops."
),
ge=0,
)
task_execution_output_json_files: list[str] | None = Field(
default=None,
description="list of file paths for task execution JSON files.",
@@ -1041,6 +1059,7 @@ class Crew(FlowTrackable, BaseModel):
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
self._maybe_replan(task, task_output, task_index, tasks, task_outputs)
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
@@ -1087,6 +1106,11 @@ class Crew(FlowTrackable, BaseModel):
tasks=self.tasks, planning_agent_llm=self.planning_llm
)._handle_crew_planning()
# Store original descriptions before appending plans so replanning
# can strip the old plan and apply a fresh one.
self._original_task_descriptions = [task.description for task in self.tasks]
self._replan_count = 0
plan_map: dict[int, str] = {}
for step_plan in result.list_of_plans_per_task:
if step_plan.task_number in plan_map:
@@ -1108,6 +1132,95 @@ class Crew(FlowTrackable, BaseModel):
f"No plan found for Task Number {task_number}",
)
def _maybe_replan(
self,
task: Task,
task_output: TaskOutput,
task_index: int,
tasks: list[Task],
all_task_outputs: list[TaskOutput],
) -> None:
"""Evaluate a completed task and replan remaining tasks if needed.
This is called after each synchronous task completes when both
``planning`` and ``replan_on_failure`` are enabled. It uses a
lightweight LLM call to decide whether the result deviates from
the plan, and if so generates revised plans for remaining tasks.
Args:
task: The task that just completed.
task_output: The output produced by the completed task.
task_index: Index of the completed task in the tasks list.
tasks: The full list of tasks being executed.
all_task_outputs: All task outputs collected so far.
"""
if (
not self.planning
or not self.replan_on_failure
or self._replan_count >= self.max_replans
):
return
remaining_tasks = tasks[task_index + 1 :]
if not remaining_tasks:
return
# Extract the plan portion appended to this task's description
original_desc = (
self._original_task_descriptions[task_index]
if task_index < len(self._original_task_descriptions)
else task.description
)
plan_text = task.description[len(original_desc) :]
if not plan_text.strip():
return
evaluator = ReplanningEvaluator(llm=self.planning_llm)
decision = evaluator.evaluate(
completed_task=task,
task_output=task_output,
original_plan=plan_text,
remaining_tasks=remaining_tasks,
)
if not decision.should_replan:
return
self._replan_count += 1
self._logger.log(
"info",
f"Replanning triggered (replan {self._replan_count}/{self.max_replans}): "
f"{decision.reason}",
)
completed_tasks = tasks[: task_index + 1]
planner = CrewPlanner(tasks=self.tasks, planning_agent_llm=self.planning_llm)
replan_result = planner._handle_crew_replanning(
completed_tasks=completed_tasks,
completed_outputs=all_task_outputs,
remaining_tasks=remaining_tasks,
deviation_reason=decision.reason,
)
# Build a map of new plans keyed by task_number
new_plan_map: dict[int, str] = {}
for step_plan in replan_result.list_of_plans_per_task:
if step_plan.task_number not in new_plan_map:
new_plan_map[step_plan.task_number] = step_plan.plan
# Apply revised plans to remaining tasks, restoring original
# descriptions first so old plans don't accumulate.
for plan_idx, remaining_task in enumerate(remaining_tasks):
global_idx = task_index + 1 + plan_idx
if global_idx < len(self._original_task_descriptions):
remaining_task.description = self._original_task_descriptions[
global_idx
]
plan_number = plan_idx + 1
if plan_number in new_plan_map:
remaining_task.description += new_plan_map[plan_number]
def _store_execution_log(
self,
task: Task,
@@ -1240,6 +1353,7 @@ class Crew(FlowTrackable, BaseModel):
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
self._maybe_replan(task, task_output, task_index, tasks, task_outputs)
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)

View File

@@ -1,5 +1,7 @@
"""Handles planning and coordination of crew tasks."""
from __future__ import annotations
import logging
from pydantic import BaseModel, Field
@@ -7,6 +9,7 @@ from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.llms.base_llm import BaseLLM
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
logger = logging.getLogger(__name__)
@@ -134,6 +137,115 @@ class CrewPlanner:
logger.warning("Error accessing agent knowledge sources")
return []
def _handle_crew_replanning(
self,
completed_tasks: list[Task],
completed_outputs: list[TaskOutput],
remaining_tasks: list[Task],
deviation_reason: str,
) -> PlannerTaskPydanticOutput:
"""Generate revised plans for remaining tasks after a deviation is detected.
This method is called when a ``ReplanningEvaluator`` determines that a
completed task's result deviates significantly from the original plan.
It creates a new plan that accounts for the actual results so far.
Args:
completed_tasks: Tasks that have already been executed.
completed_outputs: Outputs produced by the completed tasks.
remaining_tasks: Tasks that still need to be executed.
deviation_reason: Explanation of why replanning was triggered.
Returns:
A PlannerTaskPydanticOutput with revised plans for the remaining tasks.
Raises:
ValueError: If the replanning output cannot be obtained.
"""
planning_agent = self._create_planning_agent()
completed_summary = self._create_completed_tasks_summary(
completed_tasks, completed_outputs
)
remaining_summary = self._create_tasks_summary_for(remaining_tasks)
replan_task = Task(
description=(
"The crew's execution plan needs to be revised because a task result "
"deviated from the original plan's assumptions.\n\n"
f"## Reason for Replanning\n{deviation_reason}\n\n"
f"## Completed Tasks and Their Results\n{completed_summary}\n\n"
f"## Remaining Tasks That Need New Plans\n{remaining_summary}\n\n"
"Create revised step-by-step plans for the remaining tasks ONLY, "
"taking into account what has actually been accomplished so far "
"and the deviation from the original plan. The plans should adapt "
"to the real situation rather than following the now-outdated assumptions."
),
expected_output=(
"Step by step revised plan for each remaining task, "
"adapted to the actual results so far."
),
agent=planning_agent,
output_pydantic=PlannerTaskPydanticOutput,
)
result = replan_task.execute_sync()
if isinstance(result.pydantic, PlannerTaskPydanticOutput):
return result.pydantic
raise ValueError("Failed to get the Replanning output")
@staticmethod
def _create_completed_tasks_summary(
tasks: list[Task], outputs: list[TaskOutput]
) -> str:
"""Create a summary of completed tasks and their actual outputs.
Args:
tasks: The completed tasks.
outputs: The outputs from those tasks.
Returns:
A formatted string summarizing completed tasks and results.
"""
summaries = []
for idx, (task, output) in enumerate(
zip(tasks, outputs, strict=False), start=1
):
agent_role = task.agent.role if task.agent else "None"
summaries.append(
f"Task {idx} (Agent: {agent_role}):\n"
f" Description: {task.description}\n"
f" Expected Output: {task.expected_output}\n"
f" Actual Result: {output.raw}"
)
return "\n\n".join(summaries) if summaries else "No completed tasks."
@staticmethod
def _create_tasks_summary_for(tasks: list[Task]) -> str:
"""Create a summary of a subset of tasks (used for remaining tasks).
Args:
tasks: The tasks to summarize.
Returns:
A formatted string summarizing the tasks.
"""
summaries = []
for idx, task in enumerate(tasks, start=1):
agent_role = task.agent.role if task.agent else "None"
agent_goal = task.agent.goal if task.agent else "None"
summaries.append(
f"Task Number {idx}:\n"
f' "task_description": {task.description}\n'
f' "task_expected_output": {task.expected_output}\n'
f' "agent": {agent_role}\n'
f' "agent_goal": {agent_goal}\n'
f' "task_tools": {task.tools}\n'
f' "agent_tools": {task.agent.tools if task.agent else "None"}'
)
return "\n\n".join(summaries) if summaries else "No remaining tasks."
def _create_tasks_summary(self) -> str:
"""Creates a summary of all tasks.

View File

@@ -0,0 +1,162 @@
"""Evaluates whether task results deviate from the original plan and triggers replanning."""
from __future__ import annotations
import logging
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.llms.base_llm import BaseLLM
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
logger = logging.getLogger(__name__)
class ReplanDecision(BaseModel):
"""Structured decision on whether replanning is needed.
Attributes:
should_replan: Whether the crew should generate a new plan for remaining tasks.
reason: Explanation of why replanning is or is not needed.
affected_task_numbers: 1-indexed task numbers of remaining tasks most affected
by the deviation. Empty if should_replan is False.
"""
should_replan: bool = Field(
description="Whether the task result deviates significantly from the plan, requiring replanning.",
)
reason: str = Field(
description="A concise explanation of why replanning is or is not needed.",
)
affected_task_numbers: list[int] = Field(
default_factory=list,
description="1-indexed task numbers of remaining tasks most affected by the deviation.",
)
class ReplanningEvaluator:
"""Evaluates task outputs to decide if the crew's plan needs to be revised.
After each task completes, this evaluator makes a lightweight LLM call to
determine whether the result deviates significantly from what the plan
assumed. If so, it signals that replanning should occur.
Example usage::
evaluator = ReplanningEvaluator(llm="gpt-4o-mini")
decision = evaluator.evaluate(
completed_task=task,
task_output=output,
original_plan="Step 1: ...",
remaining_tasks=remaining,
)
if decision.should_replan:
# trigger replanning for remaining tasks
...
Args:
llm: The language model to use for evaluation. Accepts a string model
name or a BaseLLM instance. Defaults to ``"gpt-4o-mini"``.
"""
def __init__(self, llm: str | BaseLLM | None = None) -> None:
self.llm = llm or "gpt-4o-mini"
def evaluate(
self,
completed_task: Task,
task_output: TaskOutput,
original_plan: str,
remaining_tasks: list[Task],
) -> ReplanDecision:
"""Evaluate whether a task result deviates from the plan.
Args:
completed_task: The task that just finished executing.
task_output: The output produced by the completed task.
original_plan: The plan text that was appended to the task description.
remaining_tasks: Tasks that have not yet been executed.
Returns:
A ReplanDecision indicating whether replanning is needed.
"""
evaluation_agent = Agent(
role="Replanning Evaluator",
goal=(
"Evaluate whether the result of a completed task deviates "
"significantly from what the plan assumed, and determine if "
"the remaining tasks need a revised plan."
),
backstory=(
"You are an expert at evaluating execution plans. You compare "
"actual task results against planned expectations and identify "
"deviations that would make the remaining plan ineffective."
),
llm=self.llm,
)
remaining_summary = self._summarize_remaining_tasks(remaining_tasks)
evaluation_task = Task(
description=(
"Evaluate whether the following task result deviates significantly "
"from what the original plan assumed.\n\n"
f"## Completed Task\n"
f"Description: {completed_task.description}\n"
f"Expected Output: {completed_task.expected_output}\n\n"
f"## Plan for this Task\n{original_plan}\n\n"
f"## Actual Result\n{task_output.raw}\n\n"
f"## Remaining Tasks\n{remaining_summary}\n\n"
"Based on the above, decide if the actual result deviates enough "
"from the plan's assumptions that the remaining tasks need replanning. "
"Minor differences or format changes do NOT require replanning. "
"Only significant deviations (missing data, errors, completely "
"different approach needed, infeasible assumptions) should trigger replanning."
),
expected_output=(
"A structured decision indicating whether replanning is needed, "
"with a reason and the affected task numbers."
),
agent=evaluation_agent,
output_pydantic=ReplanDecision,
)
result = evaluation_task.execute_sync()
if isinstance(result.pydantic, ReplanDecision):
return result.pydantic
logger.warning(
"Failed to get structured ReplanDecision, defaulting to no replan"
)
return ReplanDecision(
should_replan=False,
reason="Failed to evaluate task output against plan.",
affected_task_numbers=[],
)
@staticmethod
def _summarize_remaining_tasks(remaining_tasks: list[Task]) -> str:
"""Create a summary of remaining tasks for evaluation context.
Args:
remaining_tasks: Tasks that have not yet been executed.
Returns:
A formatted string summarizing the remaining tasks.
"""
if not remaining_tasks:
return "No remaining tasks."
summaries = []
for idx, task in enumerate(remaining_tasks, start=1):
agent_role = task.agent.role if task.agent else "Unassigned"
summaries.append(
f"Task {idx}: {task.description}\n"
f" Expected Output: {task.expected_output}\n"
f" Agent: {agent_role}"
)
return "\n".join(summaries)

View File

@@ -0,0 +1,572 @@
"""Tests for the adaptive replanning feature (issue #4983).
Covers:
- ReplanningEvaluator: evaluation of task results against plans
- CrewPlanner._handle_crew_replanning: generating revised plans
- Crew integration: replan_on_failure / max_replans fields and the
_maybe_replan hook in both sync and async execution paths
- Backwards compatibility: existing crews are unaffected by default
"""
from unittest.mock import MagicMock, patch, call
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.planning_handler import (
CrewPlanner,
PlannerTaskPydanticOutput,
PlanPerTask,
)
from crewai.utilities.replanning_evaluator import (
ReplanDecision,
ReplanningEvaluator,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_agents(n: int = 3) -> list[Agent]:
return [
Agent(role=f"Agent {i}", goal=f"Goal {i}", backstory=f"Backstory {i}")
for i in range(1, n + 1)
]
def _make_tasks(agents: list[Agent]) -> list[Task]:
return [
Task(
description=f"Task {i} description",
expected_output=f"Output {i}",
agent=agents[i - 1],
)
for i in range(1, len(agents) + 1)
]
def _task_output(raw: str = "result", agent: str = "agent") -> TaskOutput:
return TaskOutput(description="desc", agent=agent, raw=raw)
# ---------------------------------------------------------------------------
# ReplanDecision model tests
# ---------------------------------------------------------------------------
class TestReplanDecision:
def test_replan_decision_defaults(self):
decision = ReplanDecision(
should_replan=False,
reason="All good",
)
assert decision.should_replan is False
assert decision.reason == "All good"
assert decision.affected_task_numbers == []
def test_replan_decision_with_affected_tasks(self):
decision = ReplanDecision(
should_replan=True,
reason="Data missing",
affected_task_numbers=[2, 3],
)
assert decision.should_replan is True
assert decision.affected_task_numbers == [2, 3]
# ---------------------------------------------------------------------------
# ReplanningEvaluator tests
# ---------------------------------------------------------------------------
class TestReplanningEvaluator:
def test_default_llm(self):
evaluator = ReplanningEvaluator()
assert evaluator.llm == "gpt-4o-mini"
def test_custom_llm(self):
evaluator = ReplanningEvaluator(llm="gpt-4o")
assert evaluator.llm == "gpt-4o"
def test_evaluate_returns_replan_decision_when_deviation_detected(self):
"""When the LLM says replanning is needed, evaluate() returns that."""
evaluator = ReplanningEvaluator()
agents = _make_agents(3)
tasks = _make_tasks(agents)
output = _task_output("completely unexpected result")
expected_decision = ReplanDecision(
should_replan=True,
reason="Result deviates from plan assumptions",
affected_task_numbers=[2, 3],
)
with patch.object(Task, "execute_sync") as mock_exec:
mock_exec.return_value = TaskOutput(
description="eval",
agent="evaluator",
pydantic=expected_decision,
)
decision = evaluator.evaluate(
completed_task=tasks[0],
task_output=output,
original_plan="Step 1: do X\nStep 2: do Y",
remaining_tasks=tasks[1:],
)
assert decision.should_replan is True
assert decision.reason == "Result deviates from plan assumptions"
mock_exec.assert_called_once()
def test_evaluate_returns_no_replan_when_result_matches(self):
"""When the result matches the plan, no replanning is needed."""
evaluator = ReplanningEvaluator()
agents = _make_agents(2)
tasks = _make_tasks(agents)
output = _task_output("expected result matching plan")
expected_decision = ReplanDecision(
should_replan=False,
reason="Result aligns with plan",
affected_task_numbers=[],
)
with patch.object(Task, "execute_sync") as mock_exec:
mock_exec.return_value = TaskOutput(
description="eval",
agent="evaluator",
pydantic=expected_decision,
)
decision = evaluator.evaluate(
completed_task=tasks[0],
task_output=output,
original_plan="Step 1: gather data",
remaining_tasks=tasks[1:],
)
assert decision.should_replan is False
def test_evaluate_fallback_on_bad_output(self):
"""When the LLM returns non-structured output, fallback to no-replan."""
evaluator = ReplanningEvaluator()
agents = _make_agents(2)
tasks = _make_tasks(agents)
output = _task_output("some result")
with patch.object(Task, "execute_sync") as mock_exec:
mock_exec.return_value = TaskOutput(
description="eval",
agent="evaluator",
pydantic=None, # no structured output
)
decision = evaluator.evaluate(
completed_task=tasks[0],
task_output=output,
original_plan="Step 1: do stuff",
remaining_tasks=tasks[1:],
)
assert decision.should_replan is False
assert "Failed to evaluate" in decision.reason
def test_summarize_remaining_tasks_empty(self):
result = ReplanningEvaluator._summarize_remaining_tasks([])
assert result == "No remaining tasks."
def test_summarize_remaining_tasks_with_tasks(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
result = ReplanningEvaluator._summarize_remaining_tasks(tasks)
assert "Task 1" in result
assert "Task 2" in result
assert "Agent 1" in result
# ---------------------------------------------------------------------------
# CrewPlanner._handle_crew_replanning tests
# ---------------------------------------------------------------------------
class TestCrewPlannerReplanning:
@pytest.fixture
def planner(self):
agents = _make_agents(3)
tasks = _make_tasks(agents)
return CrewPlanner(tasks=tasks, planning_agent_llm=None)
def test_handle_crew_replanning_returns_revised_plans(self, planner):
agents = _make_agents(3)
tasks = _make_tasks(agents)
outputs = [_task_output("result 1")]
revised_plans = PlannerTaskPydanticOutput(
list_of_plans_per_task=[
PlanPerTask(task_number=1, task="Task 2", plan="Revised plan for task 2"),
PlanPerTask(task_number=2, task="Task 3", plan="Revised plan for task 3"),
]
)
with patch.object(Task, "execute_sync") as mock_exec:
mock_exec.return_value = TaskOutput(
description="replan",
agent="planner",
pydantic=revised_plans,
)
result = planner._handle_crew_replanning(
completed_tasks=[tasks[0]],
completed_outputs=outputs,
remaining_tasks=tasks[1:],
deviation_reason="Task 1 returned unexpected data",
)
assert isinstance(result, PlannerTaskPydanticOutput)
assert len(result.list_of_plans_per_task) == 2
mock_exec.assert_called_once()
def test_handle_crew_replanning_raises_on_bad_output(self, planner):
agents = _make_agents(3)
tasks = _make_tasks(agents)
outputs = [_task_output("result 1")]
with patch.object(Task, "execute_sync") as mock_exec:
mock_exec.return_value = TaskOutput(
description="replan",
agent="planner",
pydantic=None,
)
with pytest.raises(ValueError, match="Failed to get the Replanning output"):
planner._handle_crew_replanning(
completed_tasks=[tasks[0]],
completed_outputs=outputs,
remaining_tasks=tasks[1:],
deviation_reason="deviation",
)
def test_completed_tasks_summary(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
outputs = [_task_output("result A"), _task_output("result B")]
summary = CrewPlanner._create_completed_tasks_summary(tasks, outputs)
assert "result A" in summary
assert "result B" in summary
assert "Agent 1" in summary
def test_completed_tasks_summary_empty(self):
summary = CrewPlanner._create_completed_tasks_summary([], [])
assert summary == "No completed tasks."
def test_tasks_summary_for_remaining(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
summary = CrewPlanner._create_tasks_summary_for(tasks)
assert "Task Number 1" in summary
assert "Task Number 2" in summary
def test_tasks_summary_for_empty(self):
summary = CrewPlanner._create_tasks_summary_for([])
assert summary == "No remaining tasks."
# ---------------------------------------------------------------------------
# Crew field tests (backwards compatibility)
# ---------------------------------------------------------------------------
class TestCrewReplanningFields:
def test_replan_on_failure_defaults_to_false(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks)
assert crew.replan_on_failure is False
def test_max_replans_defaults_to_3(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks)
assert crew.max_replans == 3
def test_replan_on_failure_can_be_set(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, replan_on_failure=True)
assert crew.replan_on_failure is True
def test_max_replans_can_be_set(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, max_replans=5)
assert crew.max_replans == 5
def test_max_replans_cannot_be_negative(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
with pytest.raises(ValueError):
Crew(agents=agents, tasks=tasks, max_replans=-1)
# ---------------------------------------------------------------------------
# Crew._maybe_replan integration tests
# ---------------------------------------------------------------------------
class TestCrewMaybeReplan:
def _setup_crew_with_planning(self, n_agents: int = 3) -> tuple[Crew, list[Agent], list[Task]]:
agents = _make_agents(n_agents)
tasks = _make_tasks(agents)
crew = Crew(
agents=agents,
tasks=tasks,
planning=True,
replan_on_failure=True,
max_replans=3,
)
# Simulate planning having been called
crew._original_task_descriptions = [t.description for t in tasks]
crew._replan_count = 0
# Append a fake plan to each task
for task in tasks:
task.description += " [PLAN]"
return crew, agents, tasks
def test_maybe_replan_skips_when_planning_disabled(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, planning=False, replan_on_failure=True)
crew._original_task_descriptions = [t.description for t in tasks]
# Should not call evaluator at all
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
mock_eval.assert_not_called()
def test_maybe_replan_skips_when_replan_on_failure_disabled(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, planning=True, replan_on_failure=False)
crew._original_task_descriptions = [t.description for t in tasks]
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
mock_eval.assert_not_called()
def test_maybe_replan_skips_on_last_task(self):
crew, agents, tasks = self._setup_crew_with_planning(2)
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()])
mock_eval.assert_not_called()
def test_maybe_replan_skips_when_max_replans_reached(self):
crew, agents, tasks = self._setup_crew_with_planning(3)
crew._replan_count = 3 # already at max
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
mock_eval.assert_not_called()
def test_maybe_replan_skips_when_no_plan_text(self):
agents = _make_agents(3)
tasks = _make_tasks(agents)
crew = Crew(
agents=agents, tasks=tasks,
planning=True, replan_on_failure=True,
)
crew._original_task_descriptions = [t.description for t in tasks]
crew._replan_count = 0
# No plan appended — descriptions are unchanged
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
mock_eval.assert_not_called()
def test_maybe_replan_no_replan_when_evaluator_says_no(self):
crew, agents, tasks = self._setup_crew_with_planning(3)
original_desc_1 = tasks[1].description
original_desc_2 = tasks[2].description
no_replan = ReplanDecision(
should_replan=False,
reason="Result is fine",
affected_task_numbers=[],
)
with patch.object(ReplanningEvaluator, "evaluate", return_value=no_replan):
with patch.object(CrewPlanner, "_handle_crew_replanning") as mock_handler:
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
mock_handler.assert_not_called()
assert crew._replan_count == 0
# Task descriptions should be unchanged
assert tasks[1].description == original_desc_1
assert tasks[2].description == original_desc_2
def test_maybe_replan_triggers_replanning_and_updates_tasks(self):
crew, agents, tasks = self._setup_crew_with_planning(3)
deviation_decision = ReplanDecision(
should_replan=True,
reason="Task 1 returned error data",
affected_task_numbers=[2, 3],
)
revised_plans = PlannerTaskPydanticOutput(
list_of_plans_per_task=[
PlanPerTask(task_number=1, task="Task 2", plan=" [REVISED PLAN 2]"),
PlanPerTask(task_number=2, task="Task 3", plan=" [REVISED PLAN 3]"),
]
)
with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation_decision):
with patch.object(
CrewPlanner, "_handle_crew_replanning", return_value=revised_plans
):
crew._maybe_replan(
tasks[0], _task_output("bad result"), 0, tasks, [_task_output("bad result")]
)
assert crew._replan_count == 1
# Remaining tasks should have the revised plans
assert "[REVISED PLAN 2]" in tasks[1].description
assert "[REVISED PLAN 3]" in tasks[2].description
# Old plan should be gone (original desc restored + new plan)
assert tasks[1].description.count("[PLAN]") == 0
assert tasks[2].description.count("[PLAN]") == 0
def test_maybe_replan_increments_replan_count_each_time(self):
crew, agents, tasks = self._setup_crew_with_planning(3)
deviation = ReplanDecision(
should_replan=True,
reason="deviation",
affected_task_numbers=[2],
)
revised = PlannerTaskPydanticOutput(
list_of_plans_per_task=[
PlanPerTask(task_number=1, task="T2", plan=" [NEW PLAN]"),
]
)
with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation):
with patch.object(CrewPlanner, "_handle_crew_replanning", return_value=revised):
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
assert crew._replan_count == 1
# Simulate second task completing with deviation
crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()] * 2)
assert crew._replan_count == 2
def test_maybe_replan_stops_at_max_replans(self):
crew, agents, tasks = self._setup_crew_with_planning(3)
crew.max_replans = 1
deviation = ReplanDecision(
should_replan=True,
reason="deviation",
affected_task_numbers=[2],
)
revised = PlannerTaskPydanticOutput(
list_of_plans_per_task=[
PlanPerTask(task_number=1, task="T2", plan=" [NEW]"),
]
)
with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation) as mock_eval:
with patch.object(CrewPlanner, "_handle_crew_replanning", return_value=revised):
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
assert crew._replan_count == 1
# Second call should be skipped because max_replans=1
crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()] * 2)
assert crew._replan_count == 1 # unchanged
# evaluate was only called once (the second time was short-circuited)
assert mock_eval.call_count == 1
# ---------------------------------------------------------------------------
# Crew._handle_crew_planning stores original descriptions
# ---------------------------------------------------------------------------
class TestCrewPlanningStoresOriginals:
def test_handle_crew_planning_stores_original_descriptions(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, planning=True)
original_descs = [t.description for t in tasks]
plans = [
PlanPerTask(task_number=1, task="T1", plan=" [PLAN 1]"),
PlanPerTask(task_number=2, task="T2", plan=" [PLAN 2]"),
]
mock_result = PlannerTaskPydanticOutput(list_of_plans_per_task=plans)
with patch.object(CrewPlanner, "_handle_crew_planning", return_value=mock_result):
crew._handle_crew_planning()
assert crew._original_task_descriptions == original_descs
assert crew._replan_count == 0
def test_handle_crew_planning_resets_replan_count(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, planning=True)
crew._replan_count = 5 # leftover from previous execution
plans = [PlanPerTask(task_number=1, task="T1", plan=" [PLAN]")]
mock_result = PlannerTaskPydanticOutput(list_of_plans_per_task=plans)
with patch.object(CrewPlanner, "_handle_crew_planning", return_value=mock_result):
crew._handle_crew_planning()
assert crew._replan_count == 0
# ---------------------------------------------------------------------------
# Sync execution integration test
# ---------------------------------------------------------------------------
class TestExecuteTasksWithReplanning:
def test_execute_tasks_calls_maybe_replan_for_sync_tasks(self):
"""Verify that _maybe_replan is called after each sync task execution."""
agents = _make_agents(2)
tasks = _make_tasks(agents)
crew = Crew(
agents=agents,
tasks=tasks,
planning=True,
replan_on_failure=True,
)
crew._original_task_descriptions = [t.description for t in tasks]
crew._replan_count = 0
output = _task_output("result")
with patch.object(Task, "execute_sync", return_value=output):
with patch.object(Crew, "_maybe_replan") as mock_replan:
crew._execute_tasks(tasks)
# Should be called once for each sync task
assert mock_replan.call_count == 2
def test_execute_tasks_without_replanning_is_unaffected(self):
"""Verify existing behaviour when replan_on_failure is False."""
agents = _make_agents(2)
tasks = _make_tasks(agents)
crew = Crew(
agents=agents,
tasks=tasks,
planning=False,
replan_on_failure=False,
)
output = _task_output("result")
with patch.object(Task, "execute_sync", return_value=output):
with patch.object(Crew, "_maybe_replan") as mock_replan:
result = crew._execute_tasks(tasks)
# _maybe_replan is still called but returns immediately
assert mock_replan.call_count == 2
assert result is not None