mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-15 23:42:37 +00:00
Compare commits
2 Commits
docs/file-
...
devin/1774
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
da1fd5b953 | ||
|
|
3a2e5a3abb |
@@ -112,6 +112,7 @@ from crewai.utilities.llm_utils import create_llm
|
||||
from crewai.utilities.logger import Logger
|
||||
from crewai.utilities.planning_handler import CrewPlanner
|
||||
from crewai.utilities.printer import PrinterColor
|
||||
from crewai.utilities.replanning_evaluator import ReplanningEvaluator
|
||||
from crewai.utilities.rpm_controller import RPMController
|
||||
from crewai.utilities.streaming import (
|
||||
create_async_chunk_generator,
|
||||
@@ -182,6 +183,8 @@ class Crew(FlowTrackable, BaseModel):
|
||||
default_factory=TaskOutputStorageHandler
|
||||
)
|
||||
_kickoff_event_id: str | None = PrivateAttr(default=None)
|
||||
_replan_count: int = PrivateAttr(default=0)
|
||||
_original_task_descriptions: list[str] = PrivateAttr(default_factory=list)
|
||||
|
||||
name: str | None = Field(default="crew")
|
||||
cache: bool = Field(default=True)
|
||||
@@ -269,6 +272,21 @@ class Crew(FlowTrackable, BaseModel):
|
||||
"Language model that will run the AgentPlanner if planning is True."
|
||||
),
|
||||
)
|
||||
replan_on_failure: bool = Field(
|
||||
default=False,
|
||||
description=(
|
||||
"When True and planning is enabled, evaluate each task result against "
|
||||
"the plan and trigger replanning if results deviate significantly."
|
||||
),
|
||||
)
|
||||
max_replans: int = Field(
|
||||
default=3,
|
||||
description=(
|
||||
"Maximum number of replans allowed during a single crew execution. "
|
||||
"Prevents infinite replanning loops."
|
||||
),
|
||||
ge=0,
|
||||
)
|
||||
task_execution_output_json_files: list[str] | None = Field(
|
||||
default=None,
|
||||
description="list of file paths for task execution JSON files.",
|
||||
@@ -1041,6 +1059,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
task_outputs.append(task_output)
|
||||
self._process_task_result(task, task_output)
|
||||
self._store_execution_log(task, task_output, task_index, was_replayed)
|
||||
self._maybe_replan(task, task_output, task_index, tasks, task_outputs)
|
||||
|
||||
if pending_tasks:
|
||||
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
|
||||
@@ -1087,6 +1106,11 @@ class Crew(FlowTrackable, BaseModel):
|
||||
tasks=self.tasks, planning_agent_llm=self.planning_llm
|
||||
)._handle_crew_planning()
|
||||
|
||||
# Store original descriptions before appending plans so replanning
|
||||
# can strip the old plan and apply a fresh one.
|
||||
self._original_task_descriptions = [task.description for task in self.tasks]
|
||||
self._replan_count = 0
|
||||
|
||||
plan_map: dict[int, str] = {}
|
||||
for step_plan in result.list_of_plans_per_task:
|
||||
if step_plan.task_number in plan_map:
|
||||
@@ -1108,6 +1132,95 @@ class Crew(FlowTrackable, BaseModel):
|
||||
f"No plan found for Task Number {task_number}",
|
||||
)
|
||||
|
||||
def _maybe_replan(
|
||||
self,
|
||||
task: Task,
|
||||
task_output: TaskOutput,
|
||||
task_index: int,
|
||||
tasks: list[Task],
|
||||
all_task_outputs: list[TaskOutput],
|
||||
) -> None:
|
||||
"""Evaluate a completed task and replan remaining tasks if needed.
|
||||
|
||||
This is called after each synchronous task completes when both
|
||||
``planning`` and ``replan_on_failure`` are enabled. It uses a
|
||||
lightweight LLM call to decide whether the result deviates from
|
||||
the plan, and if so generates revised plans for remaining tasks.
|
||||
|
||||
Args:
|
||||
task: The task that just completed.
|
||||
task_output: The output produced by the completed task.
|
||||
task_index: Index of the completed task in the tasks list.
|
||||
tasks: The full list of tasks being executed.
|
||||
all_task_outputs: All task outputs collected so far.
|
||||
"""
|
||||
if (
|
||||
not self.planning
|
||||
or not self.replan_on_failure
|
||||
or self._replan_count >= self.max_replans
|
||||
):
|
||||
return
|
||||
|
||||
remaining_tasks = tasks[task_index + 1 :]
|
||||
if not remaining_tasks:
|
||||
return
|
||||
|
||||
# Extract the plan portion appended to this task's description
|
||||
original_desc = (
|
||||
self._original_task_descriptions[task_index]
|
||||
if task_index < len(self._original_task_descriptions)
|
||||
else task.description
|
||||
)
|
||||
plan_text = task.description[len(original_desc) :]
|
||||
|
||||
if not plan_text.strip():
|
||||
return
|
||||
|
||||
evaluator = ReplanningEvaluator(llm=self.planning_llm)
|
||||
decision = evaluator.evaluate(
|
||||
completed_task=task,
|
||||
task_output=task_output,
|
||||
original_plan=plan_text,
|
||||
remaining_tasks=remaining_tasks,
|
||||
)
|
||||
|
||||
if not decision.should_replan:
|
||||
return
|
||||
|
||||
self._replan_count += 1
|
||||
self._logger.log(
|
||||
"info",
|
||||
f"Replanning triggered (replan {self._replan_count}/{self.max_replans}): "
|
||||
f"{decision.reason}",
|
||||
)
|
||||
|
||||
completed_tasks = tasks[: task_index + 1]
|
||||
planner = CrewPlanner(tasks=self.tasks, planning_agent_llm=self.planning_llm)
|
||||
replan_result = planner._handle_crew_replanning(
|
||||
completed_tasks=completed_tasks,
|
||||
completed_outputs=all_task_outputs,
|
||||
remaining_tasks=remaining_tasks,
|
||||
deviation_reason=decision.reason,
|
||||
)
|
||||
|
||||
# Build a map of new plans keyed by task_number
|
||||
new_plan_map: dict[int, str] = {}
|
||||
for step_plan in replan_result.list_of_plans_per_task:
|
||||
if step_plan.task_number not in new_plan_map:
|
||||
new_plan_map[step_plan.task_number] = step_plan.plan
|
||||
|
||||
# Apply revised plans to remaining tasks, restoring original
|
||||
# descriptions first so old plans don't accumulate.
|
||||
for plan_idx, remaining_task in enumerate(remaining_tasks):
|
||||
global_idx = task_index + 1 + plan_idx
|
||||
if global_idx < len(self._original_task_descriptions):
|
||||
remaining_task.description = self._original_task_descriptions[
|
||||
global_idx
|
||||
]
|
||||
plan_number = plan_idx + 1
|
||||
if plan_number in new_plan_map:
|
||||
remaining_task.description += new_plan_map[plan_number]
|
||||
|
||||
def _store_execution_log(
|
||||
self,
|
||||
task: Task,
|
||||
@@ -1240,6 +1353,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
task_outputs.append(task_output)
|
||||
self._process_task_result(task, task_output)
|
||||
self._store_execution_log(task, task_output, task_index, was_replayed)
|
||||
self._maybe_replan(task, task_output, task_index, tasks, task_outputs)
|
||||
|
||||
if futures:
|
||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
"""Handles planning and coordination of crew tasks."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
@@ -7,6 +9,7 @@ from pydantic import BaseModel, Field
|
||||
from crewai.agent import Agent
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -134,6 +137,115 @@ class CrewPlanner:
|
||||
logger.warning("Error accessing agent knowledge sources")
|
||||
return []
|
||||
|
||||
def _handle_crew_replanning(
|
||||
self,
|
||||
completed_tasks: list[Task],
|
||||
completed_outputs: list[TaskOutput],
|
||||
remaining_tasks: list[Task],
|
||||
deviation_reason: str,
|
||||
) -> PlannerTaskPydanticOutput:
|
||||
"""Generate revised plans for remaining tasks after a deviation is detected.
|
||||
|
||||
This method is called when a ``ReplanningEvaluator`` determines that a
|
||||
completed task's result deviates significantly from the original plan.
|
||||
It creates a new plan that accounts for the actual results so far.
|
||||
|
||||
Args:
|
||||
completed_tasks: Tasks that have already been executed.
|
||||
completed_outputs: Outputs produced by the completed tasks.
|
||||
remaining_tasks: Tasks that still need to be executed.
|
||||
deviation_reason: Explanation of why replanning was triggered.
|
||||
|
||||
Returns:
|
||||
A PlannerTaskPydanticOutput with revised plans for the remaining tasks.
|
||||
|
||||
Raises:
|
||||
ValueError: If the replanning output cannot be obtained.
|
||||
"""
|
||||
planning_agent = self._create_planning_agent()
|
||||
completed_summary = self._create_completed_tasks_summary(
|
||||
completed_tasks, completed_outputs
|
||||
)
|
||||
remaining_summary = self._create_tasks_summary_for(remaining_tasks)
|
||||
|
||||
replan_task = Task(
|
||||
description=(
|
||||
"The crew's execution plan needs to be revised because a task result "
|
||||
"deviated from the original plan's assumptions.\n\n"
|
||||
f"## Reason for Replanning\n{deviation_reason}\n\n"
|
||||
f"## Completed Tasks and Their Results\n{completed_summary}\n\n"
|
||||
f"## Remaining Tasks That Need New Plans\n{remaining_summary}\n\n"
|
||||
"Create revised step-by-step plans for the remaining tasks ONLY, "
|
||||
"taking into account what has actually been accomplished so far "
|
||||
"and the deviation from the original plan. The plans should adapt "
|
||||
"to the real situation rather than following the now-outdated assumptions."
|
||||
),
|
||||
expected_output=(
|
||||
"Step by step revised plan for each remaining task, "
|
||||
"adapted to the actual results so far."
|
||||
),
|
||||
agent=planning_agent,
|
||||
output_pydantic=PlannerTaskPydanticOutput,
|
||||
)
|
||||
|
||||
result = replan_task.execute_sync()
|
||||
|
||||
if isinstance(result.pydantic, PlannerTaskPydanticOutput):
|
||||
return result.pydantic
|
||||
|
||||
raise ValueError("Failed to get the Replanning output")
|
||||
|
||||
@staticmethod
|
||||
def _create_completed_tasks_summary(
|
||||
tasks: list[Task], outputs: list[TaskOutput]
|
||||
) -> str:
|
||||
"""Create a summary of completed tasks and their actual outputs.
|
||||
|
||||
Args:
|
||||
tasks: The completed tasks.
|
||||
outputs: The outputs from those tasks.
|
||||
|
||||
Returns:
|
||||
A formatted string summarizing completed tasks and results.
|
||||
"""
|
||||
summaries = []
|
||||
for idx, (task, output) in enumerate(
|
||||
zip(tasks, outputs, strict=False), start=1
|
||||
):
|
||||
agent_role = task.agent.role if task.agent else "None"
|
||||
summaries.append(
|
||||
f"Task {idx} (Agent: {agent_role}):\n"
|
||||
f" Description: {task.description}\n"
|
||||
f" Expected Output: {task.expected_output}\n"
|
||||
f" Actual Result: {output.raw}"
|
||||
)
|
||||
return "\n\n".join(summaries) if summaries else "No completed tasks."
|
||||
|
||||
@staticmethod
|
||||
def _create_tasks_summary_for(tasks: list[Task]) -> str:
|
||||
"""Create a summary of a subset of tasks (used for remaining tasks).
|
||||
|
||||
Args:
|
||||
tasks: The tasks to summarize.
|
||||
|
||||
Returns:
|
||||
A formatted string summarizing the tasks.
|
||||
"""
|
||||
summaries = []
|
||||
for idx, task in enumerate(tasks, start=1):
|
||||
agent_role = task.agent.role if task.agent else "None"
|
||||
agent_goal = task.agent.goal if task.agent else "None"
|
||||
summaries.append(
|
||||
f"Task Number {idx}:\n"
|
||||
f' "task_description": {task.description}\n'
|
||||
f' "task_expected_output": {task.expected_output}\n'
|
||||
f' "agent": {agent_role}\n'
|
||||
f' "agent_goal": {agent_goal}\n'
|
||||
f' "task_tools": {task.tools}\n'
|
||||
f' "agent_tools": {task.agent.tools if task.agent else "None"}'
|
||||
)
|
||||
return "\n\n".join(summaries) if summaries else "No remaining tasks."
|
||||
|
||||
def _create_tasks_summary(self) -> str:
|
||||
"""Creates a summary of all tasks.
|
||||
|
||||
|
||||
162
lib/crewai/src/crewai/utilities/replanning_evaluator.py
Normal file
162
lib/crewai/src/crewai/utilities/replanning_evaluator.py
Normal file
@@ -0,0 +1,162 @@
|
||||
"""Evaluates whether task results deviate from the original plan and triggers replanning."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ReplanDecision(BaseModel):
|
||||
"""Structured decision on whether replanning is needed.
|
||||
|
||||
Attributes:
|
||||
should_replan: Whether the crew should generate a new plan for remaining tasks.
|
||||
reason: Explanation of why replanning is or is not needed.
|
||||
affected_task_numbers: 1-indexed task numbers of remaining tasks most affected
|
||||
by the deviation. Empty if should_replan is False.
|
||||
"""
|
||||
|
||||
should_replan: bool = Field(
|
||||
description="Whether the task result deviates significantly from the plan, requiring replanning.",
|
||||
)
|
||||
reason: str = Field(
|
||||
description="A concise explanation of why replanning is or is not needed.",
|
||||
)
|
||||
affected_task_numbers: list[int] = Field(
|
||||
default_factory=list,
|
||||
description="1-indexed task numbers of remaining tasks most affected by the deviation.",
|
||||
)
|
||||
|
||||
|
||||
class ReplanningEvaluator:
|
||||
"""Evaluates task outputs to decide if the crew's plan needs to be revised.
|
||||
|
||||
After each task completes, this evaluator makes a lightweight LLM call to
|
||||
determine whether the result deviates significantly from what the plan
|
||||
assumed. If so, it signals that replanning should occur.
|
||||
|
||||
Example usage::
|
||||
|
||||
evaluator = ReplanningEvaluator(llm="gpt-4o-mini")
|
||||
decision = evaluator.evaluate(
|
||||
completed_task=task,
|
||||
task_output=output,
|
||||
original_plan="Step 1: ...",
|
||||
remaining_tasks=remaining,
|
||||
)
|
||||
if decision.should_replan:
|
||||
# trigger replanning for remaining tasks
|
||||
...
|
||||
|
||||
Args:
|
||||
llm: The language model to use for evaluation. Accepts a string model
|
||||
name or a BaseLLM instance. Defaults to ``"gpt-4o-mini"``.
|
||||
"""
|
||||
|
||||
def __init__(self, llm: str | BaseLLM | None = None) -> None:
|
||||
self.llm = llm or "gpt-4o-mini"
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
completed_task: Task,
|
||||
task_output: TaskOutput,
|
||||
original_plan: str,
|
||||
remaining_tasks: list[Task],
|
||||
) -> ReplanDecision:
|
||||
"""Evaluate whether a task result deviates from the plan.
|
||||
|
||||
Args:
|
||||
completed_task: The task that just finished executing.
|
||||
task_output: The output produced by the completed task.
|
||||
original_plan: The plan text that was appended to the task description.
|
||||
remaining_tasks: Tasks that have not yet been executed.
|
||||
|
||||
Returns:
|
||||
A ReplanDecision indicating whether replanning is needed.
|
||||
"""
|
||||
evaluation_agent = Agent(
|
||||
role="Replanning Evaluator",
|
||||
goal=(
|
||||
"Evaluate whether the result of a completed task deviates "
|
||||
"significantly from what the plan assumed, and determine if "
|
||||
"the remaining tasks need a revised plan."
|
||||
),
|
||||
backstory=(
|
||||
"You are an expert at evaluating execution plans. You compare "
|
||||
"actual task results against planned expectations and identify "
|
||||
"deviations that would make the remaining plan ineffective."
|
||||
),
|
||||
llm=self.llm,
|
||||
)
|
||||
|
||||
remaining_summary = self._summarize_remaining_tasks(remaining_tasks)
|
||||
|
||||
evaluation_task = Task(
|
||||
description=(
|
||||
"Evaluate whether the following task result deviates significantly "
|
||||
"from what the original plan assumed.\n\n"
|
||||
f"## Completed Task\n"
|
||||
f"Description: {completed_task.description}\n"
|
||||
f"Expected Output: {completed_task.expected_output}\n\n"
|
||||
f"## Plan for this Task\n{original_plan}\n\n"
|
||||
f"## Actual Result\n{task_output.raw}\n\n"
|
||||
f"## Remaining Tasks\n{remaining_summary}\n\n"
|
||||
"Based on the above, decide if the actual result deviates enough "
|
||||
"from the plan's assumptions that the remaining tasks need replanning. "
|
||||
"Minor differences or format changes do NOT require replanning. "
|
||||
"Only significant deviations (missing data, errors, completely "
|
||||
"different approach needed, infeasible assumptions) should trigger replanning."
|
||||
),
|
||||
expected_output=(
|
||||
"A structured decision indicating whether replanning is needed, "
|
||||
"with a reason and the affected task numbers."
|
||||
),
|
||||
agent=evaluation_agent,
|
||||
output_pydantic=ReplanDecision,
|
||||
)
|
||||
|
||||
result = evaluation_task.execute_sync()
|
||||
|
||||
if isinstance(result.pydantic, ReplanDecision):
|
||||
return result.pydantic
|
||||
|
||||
logger.warning(
|
||||
"Failed to get structured ReplanDecision, defaulting to no replan"
|
||||
)
|
||||
return ReplanDecision(
|
||||
should_replan=False,
|
||||
reason="Failed to evaluate task output against plan.",
|
||||
affected_task_numbers=[],
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _summarize_remaining_tasks(remaining_tasks: list[Task]) -> str:
|
||||
"""Create a summary of remaining tasks for evaluation context.
|
||||
|
||||
Args:
|
||||
remaining_tasks: Tasks that have not yet been executed.
|
||||
|
||||
Returns:
|
||||
A formatted string summarizing the remaining tasks.
|
||||
"""
|
||||
if not remaining_tasks:
|
||||
return "No remaining tasks."
|
||||
|
||||
summaries = []
|
||||
for idx, task in enumerate(remaining_tasks, start=1):
|
||||
agent_role = task.agent.role if task.agent else "Unassigned"
|
||||
summaries.append(
|
||||
f"Task {idx}: {task.description}\n"
|
||||
f" Expected Output: {task.expected_output}\n"
|
||||
f" Agent: {agent_role}"
|
||||
)
|
||||
return "\n".join(summaries)
|
||||
572
lib/crewai/tests/utilities/test_replanning.py
Normal file
572
lib/crewai/tests/utilities/test_replanning.py
Normal file
@@ -0,0 +1,572 @@
|
||||
"""Tests for the adaptive replanning feature (issue #4983).
|
||||
|
||||
Covers:
|
||||
- ReplanningEvaluator: evaluation of task results against plans
|
||||
- CrewPlanner._handle_crew_replanning: generating revised plans
|
||||
- Crew integration: replan_on_failure / max_replans fields and the
|
||||
_maybe_replan hook in both sync and async execution paths
|
||||
- Backwards compatibility: existing crews are unaffected by default
|
||||
"""
|
||||
|
||||
from unittest.mock import MagicMock, patch, call
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.crew import Crew
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.utilities.planning_handler import (
|
||||
CrewPlanner,
|
||||
PlannerTaskPydanticOutput,
|
||||
PlanPerTask,
|
||||
)
|
||||
from crewai.utilities.replanning_evaluator import (
|
||||
ReplanDecision,
|
||||
ReplanningEvaluator,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_agents(n: int = 3) -> list[Agent]:
|
||||
return [
|
||||
Agent(role=f"Agent {i}", goal=f"Goal {i}", backstory=f"Backstory {i}")
|
||||
for i in range(1, n + 1)
|
||||
]
|
||||
|
||||
|
||||
def _make_tasks(agents: list[Agent]) -> list[Task]:
|
||||
return [
|
||||
Task(
|
||||
description=f"Task {i} description",
|
||||
expected_output=f"Output {i}",
|
||||
agent=agents[i - 1],
|
||||
)
|
||||
for i in range(1, len(agents) + 1)
|
||||
]
|
||||
|
||||
|
||||
def _task_output(raw: str = "result", agent: str = "agent") -> TaskOutput:
|
||||
return TaskOutput(description="desc", agent=agent, raw=raw)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ReplanDecision model tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestReplanDecision:
|
||||
def test_replan_decision_defaults(self):
|
||||
decision = ReplanDecision(
|
||||
should_replan=False,
|
||||
reason="All good",
|
||||
)
|
||||
assert decision.should_replan is False
|
||||
assert decision.reason == "All good"
|
||||
assert decision.affected_task_numbers == []
|
||||
|
||||
def test_replan_decision_with_affected_tasks(self):
|
||||
decision = ReplanDecision(
|
||||
should_replan=True,
|
||||
reason="Data missing",
|
||||
affected_task_numbers=[2, 3],
|
||||
)
|
||||
assert decision.should_replan is True
|
||||
assert decision.affected_task_numbers == [2, 3]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ReplanningEvaluator tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestReplanningEvaluator:
|
||||
def test_default_llm(self):
|
||||
evaluator = ReplanningEvaluator()
|
||||
assert evaluator.llm == "gpt-4o-mini"
|
||||
|
||||
def test_custom_llm(self):
|
||||
evaluator = ReplanningEvaluator(llm="gpt-4o")
|
||||
assert evaluator.llm == "gpt-4o"
|
||||
|
||||
def test_evaluate_returns_replan_decision_when_deviation_detected(self):
|
||||
"""When the LLM says replanning is needed, evaluate() returns that."""
|
||||
evaluator = ReplanningEvaluator()
|
||||
agents = _make_agents(3)
|
||||
tasks = _make_tasks(agents)
|
||||
output = _task_output("completely unexpected result")
|
||||
|
||||
expected_decision = ReplanDecision(
|
||||
should_replan=True,
|
||||
reason="Result deviates from plan assumptions",
|
||||
affected_task_numbers=[2, 3],
|
||||
)
|
||||
|
||||
with patch.object(Task, "execute_sync") as mock_exec:
|
||||
mock_exec.return_value = TaskOutput(
|
||||
description="eval",
|
||||
agent="evaluator",
|
||||
pydantic=expected_decision,
|
||||
)
|
||||
decision = evaluator.evaluate(
|
||||
completed_task=tasks[0],
|
||||
task_output=output,
|
||||
original_plan="Step 1: do X\nStep 2: do Y",
|
||||
remaining_tasks=tasks[1:],
|
||||
)
|
||||
|
||||
assert decision.should_replan is True
|
||||
assert decision.reason == "Result deviates from plan assumptions"
|
||||
mock_exec.assert_called_once()
|
||||
|
||||
def test_evaluate_returns_no_replan_when_result_matches(self):
|
||||
"""When the result matches the plan, no replanning is needed."""
|
||||
evaluator = ReplanningEvaluator()
|
||||
agents = _make_agents(2)
|
||||
tasks = _make_tasks(agents)
|
||||
output = _task_output("expected result matching plan")
|
||||
|
||||
expected_decision = ReplanDecision(
|
||||
should_replan=False,
|
||||
reason="Result aligns with plan",
|
||||
affected_task_numbers=[],
|
||||
)
|
||||
|
||||
with patch.object(Task, "execute_sync") as mock_exec:
|
||||
mock_exec.return_value = TaskOutput(
|
||||
description="eval",
|
||||
agent="evaluator",
|
||||
pydantic=expected_decision,
|
||||
)
|
||||
decision = evaluator.evaluate(
|
||||
completed_task=tasks[0],
|
||||
task_output=output,
|
||||
original_plan="Step 1: gather data",
|
||||
remaining_tasks=tasks[1:],
|
||||
)
|
||||
|
||||
assert decision.should_replan is False
|
||||
|
||||
def test_evaluate_fallback_on_bad_output(self):
|
||||
"""When the LLM returns non-structured output, fallback to no-replan."""
|
||||
evaluator = ReplanningEvaluator()
|
||||
agents = _make_agents(2)
|
||||
tasks = _make_tasks(agents)
|
||||
output = _task_output("some result")
|
||||
|
||||
with patch.object(Task, "execute_sync") as mock_exec:
|
||||
mock_exec.return_value = TaskOutput(
|
||||
description="eval",
|
||||
agent="evaluator",
|
||||
pydantic=None, # no structured output
|
||||
)
|
||||
decision = evaluator.evaluate(
|
||||
completed_task=tasks[0],
|
||||
task_output=output,
|
||||
original_plan="Step 1: do stuff",
|
||||
remaining_tasks=tasks[1:],
|
||||
)
|
||||
|
||||
assert decision.should_replan is False
|
||||
assert "Failed to evaluate" in decision.reason
|
||||
|
||||
def test_summarize_remaining_tasks_empty(self):
|
||||
result = ReplanningEvaluator._summarize_remaining_tasks([])
|
||||
assert result == "No remaining tasks."
|
||||
|
||||
def test_summarize_remaining_tasks_with_tasks(self):
|
||||
agents = _make_agents(2)
|
||||
tasks = _make_tasks(agents)
|
||||
result = ReplanningEvaluator._summarize_remaining_tasks(tasks)
|
||||
assert "Task 1" in result
|
||||
assert "Task 2" in result
|
||||
assert "Agent 1" in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CrewPlanner._handle_crew_replanning tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCrewPlannerReplanning:
|
||||
@pytest.fixture
|
||||
def planner(self):
|
||||
agents = _make_agents(3)
|
||||
tasks = _make_tasks(agents)
|
||||
return CrewPlanner(tasks=tasks, planning_agent_llm=None)
|
||||
|
||||
def test_handle_crew_replanning_returns_revised_plans(self, planner):
|
||||
agents = _make_agents(3)
|
||||
tasks = _make_tasks(agents)
|
||||
outputs = [_task_output("result 1")]
|
||||
|
||||
revised_plans = PlannerTaskPydanticOutput(
|
||||
list_of_plans_per_task=[
|
||||
PlanPerTask(task_number=1, task="Task 2", plan="Revised plan for task 2"),
|
||||
PlanPerTask(task_number=2, task="Task 3", plan="Revised plan for task 3"),
|
||||
]
|
||||
)
|
||||
|
||||
with patch.object(Task, "execute_sync") as mock_exec:
|
||||
mock_exec.return_value = TaskOutput(
|
||||
description="replan",
|
||||
agent="planner",
|
||||
pydantic=revised_plans,
|
||||
)
|
||||
result = planner._handle_crew_replanning(
|
||||
completed_tasks=[tasks[0]],
|
||||
completed_outputs=outputs,
|
||||
remaining_tasks=tasks[1:],
|
||||
deviation_reason="Task 1 returned unexpected data",
|
||||
)
|
||||
|
||||
assert isinstance(result, PlannerTaskPydanticOutput)
|
||||
assert len(result.list_of_plans_per_task) == 2
|
||||
mock_exec.assert_called_once()
|
||||
|
||||
def test_handle_crew_replanning_raises_on_bad_output(self, planner):
|
||||
agents = _make_agents(3)
|
||||
tasks = _make_tasks(agents)
|
||||
outputs = [_task_output("result 1")]
|
||||
|
||||
with patch.object(Task, "execute_sync") as mock_exec:
|
||||
mock_exec.return_value = TaskOutput(
|
||||
description="replan",
|
||||
agent="planner",
|
||||
pydantic=None,
|
||||
)
|
||||
with pytest.raises(ValueError, match="Failed to get the Replanning output"):
|
||||
planner._handle_crew_replanning(
|
||||
completed_tasks=[tasks[0]],
|
||||
completed_outputs=outputs,
|
||||
remaining_tasks=tasks[1:],
|
||||
deviation_reason="deviation",
|
||||
)
|
||||
|
||||
def test_completed_tasks_summary(self):
|
||||
agents = _make_agents(2)
|
||||
tasks = _make_tasks(agents)
|
||||
outputs = [_task_output("result A"), _task_output("result B")]
|
||||
|
||||
summary = CrewPlanner._create_completed_tasks_summary(tasks, outputs)
|
||||
assert "result A" in summary
|
||||
assert "result B" in summary
|
||||
assert "Agent 1" in summary
|
||||
|
||||
def test_completed_tasks_summary_empty(self):
|
||||
summary = CrewPlanner._create_completed_tasks_summary([], [])
|
||||
assert summary == "No completed tasks."
|
||||
|
||||
def test_tasks_summary_for_remaining(self):
|
||||
agents = _make_agents(2)
|
||||
tasks = _make_tasks(agents)
|
||||
summary = CrewPlanner._create_tasks_summary_for(tasks)
|
||||
assert "Task Number 1" in summary
|
||||
assert "Task Number 2" in summary
|
||||
|
||||
def test_tasks_summary_for_empty(self):
|
||||
summary = CrewPlanner._create_tasks_summary_for([])
|
||||
assert summary == "No remaining tasks."
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Crew field tests (backwards compatibility)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCrewReplanningFields:
|
||||
def test_replan_on_failure_defaults_to_false(self):
|
||||
agents = _make_agents(1)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(agents=agents, tasks=tasks)
|
||||
assert crew.replan_on_failure is False
|
||||
|
||||
def test_max_replans_defaults_to_3(self):
|
||||
agents = _make_agents(1)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(agents=agents, tasks=tasks)
|
||||
assert crew.max_replans == 3
|
||||
|
||||
def test_replan_on_failure_can_be_set(self):
|
||||
agents = _make_agents(1)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(agents=agents, tasks=tasks, replan_on_failure=True)
|
||||
assert crew.replan_on_failure is True
|
||||
|
||||
def test_max_replans_can_be_set(self):
|
||||
agents = _make_agents(1)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(agents=agents, tasks=tasks, max_replans=5)
|
||||
assert crew.max_replans == 5
|
||||
|
||||
def test_max_replans_cannot_be_negative(self):
|
||||
agents = _make_agents(1)
|
||||
tasks = _make_tasks(agents)
|
||||
with pytest.raises(ValueError):
|
||||
Crew(agents=agents, tasks=tasks, max_replans=-1)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Crew._maybe_replan integration tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCrewMaybeReplan:
|
||||
def _setup_crew_with_planning(self, n_agents: int = 3) -> tuple[Crew, list[Agent], list[Task]]:
|
||||
agents = _make_agents(n_agents)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(
|
||||
agents=agents,
|
||||
tasks=tasks,
|
||||
planning=True,
|
||||
replan_on_failure=True,
|
||||
max_replans=3,
|
||||
)
|
||||
# Simulate planning having been called
|
||||
crew._original_task_descriptions = [t.description for t in tasks]
|
||||
crew._replan_count = 0
|
||||
# Append a fake plan to each task
|
||||
for task in tasks:
|
||||
task.description += " [PLAN]"
|
||||
return crew, agents, tasks
|
||||
|
||||
def test_maybe_replan_skips_when_planning_disabled(self):
|
||||
agents = _make_agents(2)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(agents=agents, tasks=tasks, planning=False, replan_on_failure=True)
|
||||
crew._original_task_descriptions = [t.description for t in tasks]
|
||||
|
||||
# Should not call evaluator at all
|
||||
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
|
||||
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
|
||||
mock_eval.assert_not_called()
|
||||
|
||||
def test_maybe_replan_skips_when_replan_on_failure_disabled(self):
|
||||
agents = _make_agents(2)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(agents=agents, tasks=tasks, planning=True, replan_on_failure=False)
|
||||
crew._original_task_descriptions = [t.description for t in tasks]
|
||||
|
||||
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
|
||||
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
|
||||
mock_eval.assert_not_called()
|
||||
|
||||
def test_maybe_replan_skips_on_last_task(self):
|
||||
crew, agents, tasks = self._setup_crew_with_planning(2)
|
||||
|
||||
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
|
||||
crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()])
|
||||
mock_eval.assert_not_called()
|
||||
|
||||
def test_maybe_replan_skips_when_max_replans_reached(self):
|
||||
crew, agents, tasks = self._setup_crew_with_planning(3)
|
||||
crew._replan_count = 3 # already at max
|
||||
|
||||
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
|
||||
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
|
||||
mock_eval.assert_not_called()
|
||||
|
||||
def test_maybe_replan_skips_when_no_plan_text(self):
|
||||
agents = _make_agents(3)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(
|
||||
agents=agents, tasks=tasks,
|
||||
planning=True, replan_on_failure=True,
|
||||
)
|
||||
crew._original_task_descriptions = [t.description for t in tasks]
|
||||
crew._replan_count = 0
|
||||
# No plan appended — descriptions are unchanged
|
||||
|
||||
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
|
||||
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
|
||||
mock_eval.assert_not_called()
|
||||
|
||||
def test_maybe_replan_no_replan_when_evaluator_says_no(self):
|
||||
crew, agents, tasks = self._setup_crew_with_planning(3)
|
||||
original_desc_1 = tasks[1].description
|
||||
original_desc_2 = tasks[2].description
|
||||
|
||||
no_replan = ReplanDecision(
|
||||
should_replan=False,
|
||||
reason="Result is fine",
|
||||
affected_task_numbers=[],
|
||||
)
|
||||
|
||||
with patch.object(ReplanningEvaluator, "evaluate", return_value=no_replan):
|
||||
with patch.object(CrewPlanner, "_handle_crew_replanning") as mock_handler:
|
||||
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
|
||||
mock_handler.assert_not_called()
|
||||
|
||||
assert crew._replan_count == 0
|
||||
# Task descriptions should be unchanged
|
||||
assert tasks[1].description == original_desc_1
|
||||
assert tasks[2].description == original_desc_2
|
||||
|
||||
def test_maybe_replan_triggers_replanning_and_updates_tasks(self):
|
||||
crew, agents, tasks = self._setup_crew_with_planning(3)
|
||||
|
||||
deviation_decision = ReplanDecision(
|
||||
should_replan=True,
|
||||
reason="Task 1 returned error data",
|
||||
affected_task_numbers=[2, 3],
|
||||
)
|
||||
|
||||
revised_plans = PlannerTaskPydanticOutput(
|
||||
list_of_plans_per_task=[
|
||||
PlanPerTask(task_number=1, task="Task 2", plan=" [REVISED PLAN 2]"),
|
||||
PlanPerTask(task_number=2, task="Task 3", plan=" [REVISED PLAN 3]"),
|
||||
]
|
||||
)
|
||||
|
||||
with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation_decision):
|
||||
with patch.object(
|
||||
CrewPlanner, "_handle_crew_replanning", return_value=revised_plans
|
||||
):
|
||||
crew._maybe_replan(
|
||||
tasks[0], _task_output("bad result"), 0, tasks, [_task_output("bad result")]
|
||||
)
|
||||
|
||||
assert crew._replan_count == 1
|
||||
# Remaining tasks should have the revised plans
|
||||
assert "[REVISED PLAN 2]" in tasks[1].description
|
||||
assert "[REVISED PLAN 3]" in tasks[2].description
|
||||
# Old plan should be gone (original desc restored + new plan)
|
||||
assert tasks[1].description.count("[PLAN]") == 0
|
||||
assert tasks[2].description.count("[PLAN]") == 0
|
||||
|
||||
def test_maybe_replan_increments_replan_count_each_time(self):
|
||||
crew, agents, tasks = self._setup_crew_with_planning(3)
|
||||
|
||||
deviation = ReplanDecision(
|
||||
should_replan=True,
|
||||
reason="deviation",
|
||||
affected_task_numbers=[2],
|
||||
)
|
||||
|
||||
revised = PlannerTaskPydanticOutput(
|
||||
list_of_plans_per_task=[
|
||||
PlanPerTask(task_number=1, task="T2", plan=" [NEW PLAN]"),
|
||||
]
|
||||
)
|
||||
|
||||
with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation):
|
||||
with patch.object(CrewPlanner, "_handle_crew_replanning", return_value=revised):
|
||||
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
|
||||
assert crew._replan_count == 1
|
||||
|
||||
# Simulate second task completing with deviation
|
||||
crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()] * 2)
|
||||
assert crew._replan_count == 2
|
||||
|
||||
def test_maybe_replan_stops_at_max_replans(self):
|
||||
crew, agents, tasks = self._setup_crew_with_planning(3)
|
||||
crew.max_replans = 1
|
||||
|
||||
deviation = ReplanDecision(
|
||||
should_replan=True,
|
||||
reason="deviation",
|
||||
affected_task_numbers=[2],
|
||||
)
|
||||
|
||||
revised = PlannerTaskPydanticOutput(
|
||||
list_of_plans_per_task=[
|
||||
PlanPerTask(task_number=1, task="T2", plan=" [NEW]"),
|
||||
]
|
||||
)
|
||||
|
||||
with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation) as mock_eval:
|
||||
with patch.object(CrewPlanner, "_handle_crew_replanning", return_value=revised):
|
||||
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
|
||||
assert crew._replan_count == 1
|
||||
|
||||
# Second call should be skipped because max_replans=1
|
||||
crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()] * 2)
|
||||
assert crew._replan_count == 1 # unchanged
|
||||
# evaluate was only called once (the second time was short-circuited)
|
||||
assert mock_eval.call_count == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Crew._handle_crew_planning stores original descriptions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCrewPlanningStoresOriginals:
|
||||
def test_handle_crew_planning_stores_original_descriptions(self):
|
||||
agents = _make_agents(2)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(agents=agents, tasks=tasks, planning=True)
|
||||
|
||||
original_descs = [t.description for t in tasks]
|
||||
|
||||
plans = [
|
||||
PlanPerTask(task_number=1, task="T1", plan=" [PLAN 1]"),
|
||||
PlanPerTask(task_number=2, task="T2", plan=" [PLAN 2]"),
|
||||
]
|
||||
mock_result = PlannerTaskPydanticOutput(list_of_plans_per_task=plans)
|
||||
|
||||
with patch.object(CrewPlanner, "_handle_crew_planning", return_value=mock_result):
|
||||
crew._handle_crew_planning()
|
||||
|
||||
assert crew._original_task_descriptions == original_descs
|
||||
assert crew._replan_count == 0
|
||||
|
||||
def test_handle_crew_planning_resets_replan_count(self):
|
||||
agents = _make_agents(1)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(agents=agents, tasks=tasks, planning=True)
|
||||
crew._replan_count = 5 # leftover from previous execution
|
||||
|
||||
plans = [PlanPerTask(task_number=1, task="T1", plan=" [PLAN]")]
|
||||
mock_result = PlannerTaskPydanticOutput(list_of_plans_per_task=plans)
|
||||
|
||||
with patch.object(CrewPlanner, "_handle_crew_planning", return_value=mock_result):
|
||||
crew._handle_crew_planning()
|
||||
|
||||
assert crew._replan_count == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sync execution integration test
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestExecuteTasksWithReplanning:
|
||||
def test_execute_tasks_calls_maybe_replan_for_sync_tasks(self):
|
||||
"""Verify that _maybe_replan is called after each sync task execution."""
|
||||
agents = _make_agents(2)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(
|
||||
agents=agents,
|
||||
tasks=tasks,
|
||||
planning=True,
|
||||
replan_on_failure=True,
|
||||
)
|
||||
crew._original_task_descriptions = [t.description for t in tasks]
|
||||
crew._replan_count = 0
|
||||
|
||||
output = _task_output("result")
|
||||
|
||||
with patch.object(Task, "execute_sync", return_value=output):
|
||||
with patch.object(Crew, "_maybe_replan") as mock_replan:
|
||||
crew._execute_tasks(tasks)
|
||||
|
||||
# Should be called once for each sync task
|
||||
assert mock_replan.call_count == 2
|
||||
|
||||
def test_execute_tasks_without_replanning_is_unaffected(self):
|
||||
"""Verify existing behaviour when replan_on_failure is False."""
|
||||
agents = _make_agents(2)
|
||||
tasks = _make_tasks(agents)
|
||||
crew = Crew(
|
||||
agents=agents,
|
||||
tasks=tasks,
|
||||
planning=False,
|
||||
replan_on_failure=False,
|
||||
)
|
||||
|
||||
output = _task_output("result")
|
||||
|
||||
with patch.object(Task, "execute_sync", return_value=output):
|
||||
with patch.object(Crew, "_maybe_replan") as mock_replan:
|
||||
result = crew._execute_tasks(tasks)
|
||||
|
||||
# _maybe_replan is still called but returns immediately
|
||||
assert mock_replan.call_count == 2
|
||||
assert result is not None
|
||||
Reference in New Issue
Block a user