diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 61d1f52cf..98d621343 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -111,6 +111,7 @@ from crewai.utilities.i18n import get_i18n from crewai.utilities.llm_utils import create_llm from crewai.utilities.logger import Logger from crewai.utilities.planning_handler import CrewPlanner +from crewai.utilities.replanning_evaluator import ReplanningEvaluator from crewai.utilities.printer import PrinterColor from crewai.utilities.rpm_controller import RPMController from crewai.utilities.streaming import ( @@ -182,6 +183,8 @@ class Crew(FlowTrackable, BaseModel): default_factory=TaskOutputStorageHandler ) _kickoff_event_id: str | None = PrivateAttr(default=None) + _replan_count: int = PrivateAttr(default=0) + _original_task_descriptions: list[str] = PrivateAttr(default_factory=list) name: str | None = Field(default="crew") cache: bool = Field(default=True) @@ -269,6 +272,21 @@ class Crew(FlowTrackable, BaseModel): "Language model that will run the AgentPlanner if planning is True." ), ) + replan_on_failure: bool = Field( + default=False, + description=( + "When True and planning is enabled, evaluate each task result against " + "the plan and trigger replanning if results deviate significantly." + ), + ) + max_replans: int = Field( + default=3, + description=( + "Maximum number of replans allowed during a single crew execution. " + "Prevents infinite replanning loops." + ), + ge=0, + ) task_execution_output_json_files: list[str] | None = Field( default=None, description="list of file paths for task execution JSON files.", @@ -1041,6 +1059,9 @@ class Crew(FlowTrackable, BaseModel): task_outputs.append(task_output) self._process_task_result(task, task_output) self._store_execution_log(task, task_output, task_index, was_replayed) + self._maybe_replan( + task, task_output, task_index, tasks, task_outputs + ) if pending_tasks: task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed) @@ -1087,6 +1108,13 @@ class Crew(FlowTrackable, BaseModel): tasks=self.tasks, planning_agent_llm=self.planning_llm )._handle_crew_planning() + # Store original descriptions before appending plans so replanning + # can strip the old plan and apply a fresh one. + self._original_task_descriptions = [ + task.description for task in self.tasks + ] + self._replan_count = 0 + plan_map: dict[int, str] = {} for step_plan in result.list_of_plans_per_task: if step_plan.task_number in plan_map: @@ -1108,6 +1136,97 @@ class Crew(FlowTrackable, BaseModel): f"No plan found for Task Number {task_number}", ) + def _maybe_replan( + self, + task: Task, + task_output: TaskOutput, + task_index: int, + tasks: list[Task], + all_task_outputs: list[TaskOutput], + ) -> None: + """Evaluate a completed task and replan remaining tasks if needed. + + This is called after each synchronous task completes when both + ``planning`` and ``replan_on_failure`` are enabled. It uses a + lightweight LLM call to decide whether the result deviates from + the plan, and if so generates revised plans for remaining tasks. + + Args: + task: The task that just completed. + task_output: The output produced by the completed task. + task_index: Index of the completed task in the tasks list. + tasks: The full list of tasks being executed. + all_task_outputs: All task outputs collected so far. + """ + if ( + not self.planning + or not self.replan_on_failure + or self._replan_count >= self.max_replans + ): + return + + remaining_tasks = tasks[task_index + 1:] + if not remaining_tasks: + return + + # Extract the plan portion appended to this task's description + original_desc = ( + self._original_task_descriptions[task_index] + if task_index < len(self._original_task_descriptions) + else task.description + ) + plan_text = task.description[len(original_desc):] + + if not plan_text.strip(): + return + + evaluator = ReplanningEvaluator(llm=self.planning_llm) + decision = evaluator.evaluate( + completed_task=task, + task_output=task_output, + original_plan=plan_text, + remaining_tasks=remaining_tasks, + ) + + if not decision.should_replan: + return + + self._replan_count += 1 + self._logger.log( + "info", + f"Replanning triggered (replan {self._replan_count}/{self.max_replans}): " + f"{decision.reason}", + ) + + completed_tasks = tasks[: task_index + 1] + planner = CrewPlanner( + tasks=self.tasks, planning_agent_llm=self.planning_llm + ) + replan_result = planner._handle_crew_replanning( + completed_tasks=completed_tasks, + completed_outputs=all_task_outputs, + remaining_tasks=remaining_tasks, + deviation_reason=decision.reason, + ) + + # Build a map of new plans keyed by task_number + new_plan_map: dict[int, str] = {} + for step_plan in replan_result.list_of_plans_per_task: + if step_plan.task_number not in new_plan_map: + new_plan_map[step_plan.task_number] = step_plan.plan + + # Apply revised plans to remaining tasks, restoring original + # descriptions first so old plans don't accumulate. + for plan_idx, remaining_task in enumerate(remaining_tasks): + global_idx = task_index + 1 + plan_idx + if global_idx < len(self._original_task_descriptions): + remaining_task.description = self._original_task_descriptions[ + global_idx + ] + plan_number = plan_idx + 1 + if plan_number in new_plan_map: + remaining_task.description += new_plan_map[plan_number] + def _store_execution_log( self, task: Task, @@ -1240,6 +1359,9 @@ class Crew(FlowTrackable, BaseModel): task_outputs.append(task_output) self._process_task_result(task, task_output) self._store_execution_log(task, task_output, task_index, was_replayed) + self._maybe_replan( + task, task_output, task_index, tasks, task_outputs + ) if futures: task_outputs = self._process_async_tasks(futures, was_replayed) diff --git a/lib/crewai/src/crewai/utilities/planning_handler.py b/lib/crewai/src/crewai/utilities/planning_handler.py index 2497b9fc8..4a9d14d42 100644 --- a/lib/crewai/src/crewai/utilities/planning_handler.py +++ b/lib/crewai/src/crewai/utilities/planning_handler.py @@ -1,5 +1,7 @@ """Handles planning and coordination of crew tasks.""" +from __future__ import annotations + import logging from pydantic import BaseModel, Field @@ -7,6 +9,7 @@ from pydantic import BaseModel, Field from crewai.agent import Agent from crewai.llms.base_llm import BaseLLM from crewai.task import Task +from crewai.tasks.task_output import TaskOutput logger = logging.getLogger(__name__) @@ -134,6 +137,113 @@ class CrewPlanner: logger.warning("Error accessing agent knowledge sources") return [] + def _handle_crew_replanning( + self, + completed_tasks: list[Task], + completed_outputs: list[TaskOutput], + remaining_tasks: list[Task], + deviation_reason: str, + ) -> PlannerTaskPydanticOutput: + """Generate revised plans for remaining tasks after a deviation is detected. + + This method is called when a ``ReplanningEvaluator`` determines that a + completed task's result deviates significantly from the original plan. + It creates a new plan that accounts for the actual results so far. + + Args: + completed_tasks: Tasks that have already been executed. + completed_outputs: Outputs produced by the completed tasks. + remaining_tasks: Tasks that still need to be executed. + deviation_reason: Explanation of why replanning was triggered. + + Returns: + A PlannerTaskPydanticOutput with revised plans for the remaining tasks. + + Raises: + ValueError: If the replanning output cannot be obtained. + """ + planning_agent = self._create_planning_agent() + completed_summary = self._create_completed_tasks_summary( + completed_tasks, completed_outputs + ) + remaining_summary = self._create_tasks_summary_for(remaining_tasks) + + replan_task = Task( + description=( + "The crew's execution plan needs to be revised because a task result " + "deviated from the original plan's assumptions.\n\n" + f"## Reason for Replanning\n{deviation_reason}\n\n" + f"## Completed Tasks and Their Results\n{completed_summary}\n\n" + f"## Remaining Tasks That Need New Plans\n{remaining_summary}\n\n" + "Create revised step-by-step plans for the remaining tasks ONLY, " + "taking into account what has actually been accomplished so far " + "and the deviation from the original plan. The plans should adapt " + "to the real situation rather than following the now-outdated assumptions." + ), + expected_output=( + "Step by step revised plan for each remaining task, " + "adapted to the actual results so far." + ), + agent=planning_agent, + output_pydantic=PlannerTaskPydanticOutput, + ) + + result = replan_task.execute_sync() + + if isinstance(result.pydantic, PlannerTaskPydanticOutput): + return result.pydantic + + raise ValueError("Failed to get the Replanning output") + + @staticmethod + def _create_completed_tasks_summary( + tasks: list[Task], outputs: list[TaskOutput] + ) -> str: + """Create a summary of completed tasks and their actual outputs. + + Args: + tasks: The completed tasks. + outputs: The outputs from those tasks. + + Returns: + A formatted string summarizing completed tasks and results. + """ + summaries = [] + for idx, (task, output) in enumerate(zip(tasks, outputs), start=1): + agent_role = task.agent.role if task.agent else "None" + summaries.append( + f"Task {idx} (Agent: {agent_role}):\n" + f" Description: {task.description}\n" + f" Expected Output: {task.expected_output}\n" + f" Actual Result: {output.raw}" + ) + return "\n\n".join(summaries) if summaries else "No completed tasks." + + @staticmethod + def _create_tasks_summary_for(tasks: list[Task]) -> str: + """Create a summary of a subset of tasks (used for remaining tasks). + + Args: + tasks: The tasks to summarize. + + Returns: + A formatted string summarizing the tasks. + """ + summaries = [] + for idx, task in enumerate(tasks, start=1): + agent_role = task.agent.role if task.agent else "None" + agent_goal = task.agent.goal if task.agent else "None" + summaries.append( + f'Task Number {idx}:\n' + f' "task_description": {task.description}\n' + f' "task_expected_output": {task.expected_output}\n' + f' "agent": {agent_role}\n' + f' "agent_goal": {agent_goal}\n' + f' "task_tools": {task.tools}\n' + f' "agent_tools": {task.agent.tools if task.agent else "None"}' + ) + return "\n\n".join(summaries) if summaries else "No remaining tasks." + def _create_tasks_summary(self) -> str: """Creates a summary of all tasks. diff --git a/lib/crewai/src/crewai/utilities/replanning_evaluator.py b/lib/crewai/src/crewai/utilities/replanning_evaluator.py new file mode 100644 index 000000000..80b1f8575 --- /dev/null +++ b/lib/crewai/src/crewai/utilities/replanning_evaluator.py @@ -0,0 +1,160 @@ +"""Evaluates whether task results deviate from the original plan and triggers replanning.""" + +from __future__ import annotations + +import logging + +from pydantic import BaseModel, Field + +from crewai.agent import Agent +from crewai.llms.base_llm import BaseLLM +from crewai.task import Task +from crewai.tasks.task_output import TaskOutput + + +logger = logging.getLogger(__name__) + + +class ReplanDecision(BaseModel): + """Structured decision on whether replanning is needed. + + Attributes: + should_replan: Whether the crew should generate a new plan for remaining tasks. + reason: Explanation of why replanning is or is not needed. + affected_task_numbers: 1-indexed task numbers of remaining tasks most affected + by the deviation. Empty if should_replan is False. + """ + + should_replan: bool = Field( + description="Whether the task result deviates significantly from the plan, requiring replanning.", + ) + reason: str = Field( + description="A concise explanation of why replanning is or is not needed.", + ) + affected_task_numbers: list[int] = Field( + default_factory=list, + description="1-indexed task numbers of remaining tasks most affected by the deviation.", + ) + + +class ReplanningEvaluator: + """Evaluates task outputs to decide if the crew's plan needs to be revised. + + After each task completes, this evaluator makes a lightweight LLM call to + determine whether the result deviates significantly from what the plan + assumed. If so, it signals that replanning should occur. + + Example usage:: + + evaluator = ReplanningEvaluator(llm="gpt-4o-mini") + decision = evaluator.evaluate( + completed_task=task, + task_output=output, + original_plan="Step 1: ...", + remaining_tasks=remaining, + ) + if decision.should_replan: + # trigger replanning for remaining tasks + ... + + Args: + llm: The language model to use for evaluation. Accepts a string model + name or a BaseLLM instance. Defaults to ``"gpt-4o-mini"``. + """ + + def __init__(self, llm: str | BaseLLM | None = None) -> None: + self.llm = llm or "gpt-4o-mini" + + def evaluate( + self, + completed_task: Task, + task_output: TaskOutput, + original_plan: str, + remaining_tasks: list[Task], + ) -> ReplanDecision: + """Evaluate whether a task result deviates from the plan. + + Args: + completed_task: The task that just finished executing. + task_output: The output produced by the completed task. + original_plan: The plan text that was appended to the task description. + remaining_tasks: Tasks that have not yet been executed. + + Returns: + A ReplanDecision indicating whether replanning is needed. + """ + evaluation_agent = Agent( + role="Replanning Evaluator", + goal=( + "Evaluate whether the result of a completed task deviates " + "significantly from what the plan assumed, and determine if " + "the remaining tasks need a revised plan." + ), + backstory=( + "You are an expert at evaluating execution plans. You compare " + "actual task results against planned expectations and identify " + "deviations that would make the remaining plan ineffective." + ), + llm=self.llm, + ) + + remaining_summary = self._summarize_remaining_tasks(remaining_tasks) + + evaluation_task = Task( + description=( + "Evaluate whether the following task result deviates significantly " + "from what the original plan assumed.\n\n" + f"## Completed Task\n" + f"Description: {completed_task.description}\n" + f"Expected Output: {completed_task.expected_output}\n\n" + f"## Plan for this Task\n{original_plan}\n\n" + f"## Actual Result\n{task_output.raw}\n\n" + f"## Remaining Tasks\n{remaining_summary}\n\n" + "Based on the above, decide if the actual result deviates enough " + "from the plan's assumptions that the remaining tasks need replanning. " + "Minor differences or format changes do NOT require replanning. " + "Only significant deviations (missing data, errors, completely " + "different approach needed, infeasible assumptions) should trigger replanning." + ), + expected_output=( + "A structured decision indicating whether replanning is needed, " + "with a reason and the affected task numbers." + ), + agent=evaluation_agent, + output_pydantic=ReplanDecision, + ) + + result = evaluation_task.execute_sync() + + if isinstance(result.pydantic, ReplanDecision): + return result.pydantic + + logger.warning("Failed to get structured ReplanDecision, defaulting to no replan") + return ReplanDecision( + should_replan=False, + reason="Failed to evaluate task output against plan.", + affected_task_numbers=[], + ) + + @staticmethod + def _summarize_remaining_tasks(remaining_tasks: list[Task]) -> str: + """Create a summary of remaining tasks for evaluation context. + + Args: + remaining_tasks: Tasks that have not yet been executed. + + Returns: + A formatted string summarizing the remaining tasks. + """ + if not remaining_tasks: + return "No remaining tasks." + + summaries = [] + for idx, task in enumerate(remaining_tasks, start=1): + agent_role = task.agent.role if task.agent else "Unassigned" + summaries.append( + f"Task {idx}: {task.description}\n" + f" Expected Output: {task.expected_output}\n" + f" Agent: {agent_role}" + ) + return "\n".join(summaries) diff --git a/lib/crewai/tests/utilities/test_replanning.py b/lib/crewai/tests/utilities/test_replanning.py new file mode 100644 index 000000000..29fc52b05 --- /dev/null +++ b/lib/crewai/tests/utilities/test_replanning.py @@ -0,0 +1,572 @@ +"""Tests for the adaptive replanning feature (issue #4983). + +Covers: +- ReplanningEvaluator: evaluation of task results against plans +- CrewPlanner._handle_crew_replanning: generating revised plans +- Crew integration: replan_on_failure / max_replans fields and the + _maybe_replan hook in both sync and async execution paths +- Backwards compatibility: existing crews are unaffected by default +""" + +from unittest.mock import MagicMock, patch, call + +import pytest + +from crewai.agent import Agent +from crewai.crew import Crew +from crewai.task import Task +from crewai.tasks.task_output import TaskOutput +from crewai.utilities.planning_handler import ( + CrewPlanner, + PlannerTaskPydanticOutput, + PlanPerTask, +) +from crewai.utilities.replanning_evaluator import ( + ReplanDecision, + ReplanningEvaluator, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_agents(n: int = 3) -> list[Agent]: + return [ + Agent(role=f"Agent {i}", goal=f"Goal {i}", backstory=f"Backstory {i}") + for i in range(1, n + 1) + ] + + +def _make_tasks(agents: list[Agent]) -> list[Task]: + return [ + Task( + description=f"Task {i} description", + expected_output=f"Output {i}", + agent=agents[i - 1], + ) + for i in range(1, len(agents) + 1) + ] + + +def _task_output(raw: str = "result", agent: str = "agent") -> TaskOutput: + return TaskOutput(description="desc", agent=agent, raw=raw) + + +# --------------------------------------------------------------------------- +# ReplanDecision model tests +# --------------------------------------------------------------------------- + +class TestReplanDecision: + def test_replan_decision_defaults(self): + decision = ReplanDecision( + should_replan=False, + reason="All good", + ) + assert decision.should_replan is False + assert decision.reason == "All good" + assert decision.affected_task_numbers == [] + + def test_replan_decision_with_affected_tasks(self): + decision = ReplanDecision( + should_replan=True, + reason="Data missing", + affected_task_numbers=[2, 3], + ) + assert decision.should_replan is True + assert decision.affected_task_numbers == [2, 3] + + +# --------------------------------------------------------------------------- +# ReplanningEvaluator tests +# --------------------------------------------------------------------------- + +class TestReplanningEvaluator: + def test_default_llm(self): + evaluator = ReplanningEvaluator() + assert evaluator.llm == "gpt-4o-mini" + + def test_custom_llm(self): + evaluator = ReplanningEvaluator(llm="gpt-4o") + assert evaluator.llm == "gpt-4o" + + def test_evaluate_returns_replan_decision_when_deviation_detected(self): + """When the LLM says replanning is needed, evaluate() returns that.""" + evaluator = ReplanningEvaluator() + agents = _make_agents(3) + tasks = _make_tasks(agents) + output = _task_output("completely unexpected result") + + expected_decision = ReplanDecision( + should_replan=True, + reason="Result deviates from plan assumptions", + affected_task_numbers=[2, 3], + ) + + with patch.object(Task, "execute_sync") as mock_exec: + mock_exec.return_value = TaskOutput( + description="eval", + agent="evaluator", + pydantic=expected_decision, + ) + decision = evaluator.evaluate( + completed_task=tasks[0], + task_output=output, + original_plan="Step 1: do X\nStep 2: do Y", + remaining_tasks=tasks[1:], + ) + + assert decision.should_replan is True + assert decision.reason == "Result deviates from plan assumptions" + mock_exec.assert_called_once() + + def test_evaluate_returns_no_replan_when_result_matches(self): + """When the result matches the plan, no replanning is needed.""" + evaluator = ReplanningEvaluator() + agents = _make_agents(2) + tasks = _make_tasks(agents) + output = _task_output("expected result matching plan") + + expected_decision = ReplanDecision( + should_replan=False, + reason="Result aligns with plan", + affected_task_numbers=[], + ) + + with patch.object(Task, "execute_sync") as mock_exec: + mock_exec.return_value = TaskOutput( + description="eval", + agent="evaluator", + pydantic=expected_decision, + ) + decision = evaluator.evaluate( + completed_task=tasks[0], + task_output=output, + original_plan="Step 1: gather data", + remaining_tasks=tasks[1:], + ) + + assert decision.should_replan is False + + def test_evaluate_fallback_on_bad_output(self): + """When the LLM returns non-structured output, fallback to no-replan.""" + evaluator = ReplanningEvaluator() + agents = _make_agents(2) + tasks = _make_tasks(agents) + output = _task_output("some result") + + with patch.object(Task, "execute_sync") as mock_exec: + mock_exec.return_value = TaskOutput( + description="eval", + agent="evaluator", + pydantic=None, # no structured output + ) + decision = evaluator.evaluate( + completed_task=tasks[0], + task_output=output, + original_plan="Step 1: do stuff", + remaining_tasks=tasks[1:], + ) + + assert decision.should_replan is False + assert "Failed to evaluate" in decision.reason + + def test_summarize_remaining_tasks_empty(self): + result = ReplanningEvaluator._summarize_remaining_tasks([]) + assert result == "No remaining tasks." + + def test_summarize_remaining_tasks_with_tasks(self): + agents = _make_agents(2) + tasks = _make_tasks(agents) + result = ReplanningEvaluator._summarize_remaining_tasks(tasks) + assert "Task 1" in result + assert "Task 2" in result + assert "Agent 1" in result + + +# --------------------------------------------------------------------------- +# CrewPlanner._handle_crew_replanning tests +# --------------------------------------------------------------------------- + +class TestCrewPlannerReplanning: + @pytest.fixture + def planner(self): + agents = _make_agents(3) + tasks = _make_tasks(agents) + return CrewPlanner(tasks=tasks, planning_agent_llm=None) + + def test_handle_crew_replanning_returns_revised_plans(self, planner): + agents = _make_agents(3) + tasks = _make_tasks(agents) + outputs = [_task_output("result 1")] + + revised_plans = PlannerTaskPydanticOutput( + list_of_plans_per_task=[ + PlanPerTask(task_number=1, task="Task 2", plan="Revised plan for task 2"), + PlanPerTask(task_number=2, task="Task 3", plan="Revised plan for task 3"), + ] + ) + + with patch.object(Task, "execute_sync") as mock_exec: + mock_exec.return_value = TaskOutput( + description="replan", + agent="planner", + pydantic=revised_plans, + ) + result = planner._handle_crew_replanning( + completed_tasks=[tasks[0]], + completed_outputs=outputs, + remaining_tasks=tasks[1:], + deviation_reason="Task 1 returned unexpected data", + ) + + assert isinstance(result, PlannerTaskPydanticOutput) + assert len(result.list_of_plans_per_task) == 2 + mock_exec.assert_called_once() + + def test_handle_crew_replanning_raises_on_bad_output(self, planner): + agents = _make_agents(3) + tasks = _make_tasks(agents) + outputs = [_task_output("result 1")] + + with patch.object(Task, "execute_sync") as mock_exec: + mock_exec.return_value = TaskOutput( + description="replan", + agent="planner", + pydantic=None, + ) + with pytest.raises(ValueError, match="Failed to get the Replanning output"): + planner._handle_crew_replanning( + completed_tasks=[tasks[0]], + completed_outputs=outputs, + remaining_tasks=tasks[1:], + deviation_reason="deviation", + ) + + def test_completed_tasks_summary(self): + agents = _make_agents(2) + tasks = _make_tasks(agents) + outputs = [_task_output("result A"), _task_output("result B")] + + summary = CrewPlanner._create_completed_tasks_summary(tasks, outputs) + assert "result A" in summary + assert "result B" in summary + assert "Agent 1" in summary + + def test_completed_tasks_summary_empty(self): + summary = CrewPlanner._create_completed_tasks_summary([], []) + assert summary == "No completed tasks." + + def test_tasks_summary_for_remaining(self): + agents = _make_agents(2) + tasks = _make_tasks(agents) + summary = CrewPlanner._create_tasks_summary_for(tasks) + assert "Task Number 1" in summary + assert "Task Number 2" in summary + + def test_tasks_summary_for_empty(self): + summary = CrewPlanner._create_tasks_summary_for([]) + assert summary == "No remaining tasks." + + +# --------------------------------------------------------------------------- +# Crew field tests (backwards compatibility) +# --------------------------------------------------------------------------- + +class TestCrewReplanningFields: + def test_replan_on_failure_defaults_to_false(self): + agents = _make_agents(1) + tasks = _make_tasks(agents) + crew = Crew(agents=agents, tasks=tasks) + assert crew.replan_on_failure is False + + def test_max_replans_defaults_to_3(self): + agents = _make_agents(1) + tasks = _make_tasks(agents) + crew = Crew(agents=agents, tasks=tasks) + assert crew.max_replans == 3 + + def test_replan_on_failure_can_be_set(self): + agents = _make_agents(1) + tasks = _make_tasks(agents) + crew = Crew(agents=agents, tasks=tasks, replan_on_failure=True) + assert crew.replan_on_failure is True + + def test_max_replans_can_be_set(self): + agents = _make_agents(1) + tasks = _make_tasks(agents) + crew = Crew(agents=agents, tasks=tasks, max_replans=5) + assert crew.max_replans == 5 + + def test_max_replans_cannot_be_negative(self): + agents = _make_agents(1) + tasks = _make_tasks(agents) + with pytest.raises(ValueError): + Crew(agents=agents, tasks=tasks, max_replans=-1) + + +# --------------------------------------------------------------------------- +# Crew._maybe_replan integration tests +# --------------------------------------------------------------------------- + +class TestCrewMaybeReplan: + def _setup_crew_with_planning(self, n_agents: int = 3) -> tuple[Crew, list[Agent], list[Task]]: + agents = _make_agents(n_agents) + tasks = _make_tasks(agents) + crew = Crew( + agents=agents, + tasks=tasks, + planning=True, + replan_on_failure=True, + max_replans=3, + ) + # Simulate planning having been called + crew._original_task_descriptions = [t.description for t in tasks] + crew._replan_count = 0 + # Append a fake plan to each task + for task in tasks: + task.description += " [PLAN]" + return crew, agents, tasks + + def test_maybe_replan_skips_when_planning_disabled(self): + agents = _make_agents(2) + tasks = _make_tasks(agents) + crew = Crew(agents=agents, tasks=tasks, planning=False, replan_on_failure=True) + crew._original_task_descriptions = [t.description for t in tasks] + + # Should not call evaluator at all + with patch.object(ReplanningEvaluator, "evaluate") as mock_eval: + crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()]) + mock_eval.assert_not_called() + + def test_maybe_replan_skips_when_replan_on_failure_disabled(self): + agents = _make_agents(2) + tasks = _make_tasks(agents) + crew = Crew(agents=agents, tasks=tasks, planning=True, replan_on_failure=False) + crew._original_task_descriptions = [t.description for t in tasks] + + with patch.object(ReplanningEvaluator, "evaluate") as mock_eval: + crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()]) + mock_eval.assert_not_called() + + def test_maybe_replan_skips_on_last_task(self): + crew, agents, tasks = self._setup_crew_with_planning(2) + + with patch.object(ReplanningEvaluator, "evaluate") as mock_eval: + crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()]) + mock_eval.assert_not_called() + + def test_maybe_replan_skips_when_max_replans_reached(self): + crew, agents, tasks = self._setup_crew_with_planning(3) + crew._replan_count = 3 # already at max + + with patch.object(ReplanningEvaluator, "evaluate") as mock_eval: + crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()]) + mock_eval.assert_not_called() + + def test_maybe_replan_skips_when_no_plan_text(self): + agents = _make_agents(3) + tasks = _make_tasks(agents) + crew = Crew( + agents=agents, tasks=tasks, + planning=True, replan_on_failure=True, + ) + crew._original_task_descriptions = [t.description for t in tasks] + crew._replan_count = 0 + # No plan appended — descriptions are unchanged + + with patch.object(ReplanningEvaluator, "evaluate") as mock_eval: + crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()]) + mock_eval.assert_not_called() + + def test_maybe_replan_no_replan_when_evaluator_says_no(self): + crew, agents, tasks = self._setup_crew_with_planning(3) + original_desc_1 = tasks[1].description + original_desc_2 = tasks[2].description + + no_replan = ReplanDecision( + should_replan=False, + reason="Result is fine", + affected_task_numbers=[], + ) + + with patch.object(ReplanningEvaluator, "evaluate", return_value=no_replan): + with patch.object(CrewPlanner, "_handle_crew_replanning") as mock_handler: + crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()]) + mock_handler.assert_not_called() + + assert crew._replan_count == 0 + # Task descriptions should be unchanged + assert tasks[1].description == original_desc_1 + assert tasks[2].description == original_desc_2 + + def test_maybe_replan_triggers_replanning_and_updates_tasks(self): + crew, agents, tasks = self._setup_crew_with_planning(3) + + deviation_decision = ReplanDecision( + should_replan=True, + reason="Task 1 returned error data", + affected_task_numbers=[2, 3], + ) + + revised_plans = PlannerTaskPydanticOutput( + list_of_plans_per_task=[ + PlanPerTask(task_number=1, task="Task 2", plan=" [REVISED PLAN 2]"), + PlanPerTask(task_number=2, task="Task 3", plan=" [REVISED PLAN 3]"), + ] + ) + + with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation_decision): + with patch.object( + CrewPlanner, "_handle_crew_replanning", return_value=revised_plans + ): + crew._maybe_replan( + tasks[0], _task_output("bad result"), 0, tasks, [_task_output("bad result")] + ) + + assert crew._replan_count == 1 + # Remaining tasks should have the revised plans + assert "[REVISED PLAN 2]" in tasks[1].description + assert "[REVISED PLAN 3]" in tasks[2].description + # Old plan should be gone (original desc restored + new plan) + assert tasks[1].description.count("[PLAN]") == 0 + assert tasks[2].description.count("[PLAN]") == 0 + + def test_maybe_replan_increments_replan_count_each_time(self): + crew, agents, tasks = self._setup_crew_with_planning(3) + + deviation = ReplanDecision( + should_replan=True, + reason="deviation", + affected_task_numbers=[2], + ) + + revised = PlannerTaskPydanticOutput( + list_of_plans_per_task=[ + PlanPerTask(task_number=1, task="T2", plan=" [NEW PLAN]"), + ] + ) + + with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation): + with patch.object(CrewPlanner, "_handle_crew_replanning", return_value=revised): + crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()]) + assert crew._replan_count == 1 + + # Simulate second task completing with deviation + crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()] * 2) + assert crew._replan_count == 2 + + def test_maybe_replan_stops_at_max_replans(self): + crew, agents, tasks = self._setup_crew_with_planning(3) + crew.max_replans = 1 + + deviation = ReplanDecision( + should_replan=True, + reason="deviation", + affected_task_numbers=[2], + ) + + revised = PlannerTaskPydanticOutput( + list_of_plans_per_task=[ + PlanPerTask(task_number=1, task="T2", plan=" [NEW]"), + ] + ) + + with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation) as mock_eval: + with patch.object(CrewPlanner, "_handle_crew_replanning", return_value=revised): + crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()]) + assert crew._replan_count == 1 + + # Second call should be skipped because max_replans=1 + crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()] * 2) + assert crew._replan_count == 1 # unchanged + # evaluate was only called once (the second time was short-circuited) + assert mock_eval.call_count == 1 + + +# --------------------------------------------------------------------------- +# Crew._handle_crew_planning stores original descriptions +# --------------------------------------------------------------------------- + +class TestCrewPlanningStoresOriginals: + def test_handle_crew_planning_stores_original_descriptions(self): + agents = _make_agents(2) + tasks = _make_tasks(agents) + crew = Crew(agents=agents, tasks=tasks, planning=True) + + original_descs = [t.description for t in tasks] + + plans = [ + PlanPerTask(task_number=1, task="T1", plan=" [PLAN 1]"), + PlanPerTask(task_number=2, task="T2", plan=" [PLAN 2]"), + ] + mock_result = PlannerTaskPydanticOutput(list_of_plans_per_task=plans) + + with patch.object(CrewPlanner, "_handle_crew_planning", return_value=mock_result): + crew._handle_crew_planning() + + assert crew._original_task_descriptions == original_descs + assert crew._replan_count == 0 + + def test_handle_crew_planning_resets_replan_count(self): + agents = _make_agents(1) + tasks = _make_tasks(agents) + crew = Crew(agents=agents, tasks=tasks, planning=True) + crew._replan_count = 5 # leftover from previous execution + + plans = [PlanPerTask(task_number=1, task="T1", plan=" [PLAN]")] + mock_result = PlannerTaskPydanticOutput(list_of_plans_per_task=plans) + + with patch.object(CrewPlanner, "_handle_crew_planning", return_value=mock_result): + crew._handle_crew_planning() + + assert crew._replan_count == 0 + + +# --------------------------------------------------------------------------- +# Sync execution integration test +# --------------------------------------------------------------------------- + +class TestExecuteTasksWithReplanning: + def test_execute_tasks_calls_maybe_replan_for_sync_tasks(self): + """Verify that _maybe_replan is called after each sync task execution.""" + agents = _make_agents(2) + tasks = _make_tasks(agents) + crew = Crew( + agents=agents, + tasks=tasks, + planning=True, + replan_on_failure=True, + ) + crew._original_task_descriptions = [t.description for t in tasks] + crew._replan_count = 0 + + output = _task_output("result") + + with patch.object(Task, "execute_sync", return_value=output): + with patch.object(Crew, "_maybe_replan") as mock_replan: + crew._execute_tasks(tasks) + + # Should be called once for each sync task + assert mock_replan.call_count == 2 + + def test_execute_tasks_without_replanning_is_unaffected(self): + """Verify existing behaviour when replan_on_failure is False.""" + agents = _make_agents(2) + tasks = _make_tasks(agents) + crew = Crew( + agents=agents, + tasks=tasks, + planning=False, + replan_on_failure=False, + ) + + output = _task_output("result") + + with patch.object(Task, "execute_sync", return_value=output): + with patch.object(Crew, "_maybe_replan") as mock_replan: + result = crew._execute_tasks(tasks) + + # _maybe_replan is still called but returns immediately + assert mock_replan.call_count == 2 + assert result is not None