fix: add idx for task ordering, tests

This commit is contained in:
Greyson LaLonde
2025-11-29 10:56:30 -05:00
parent bc4e6a3127
commit 02fef6aed7
3 changed files with 175 additions and 13 deletions

View File

@@ -327,7 +327,7 @@ class Crew(FlowTrackable, BaseModel):
def set_private_attrs(self) -> Crew:
"""set private attributes."""
self._cache_handler = CacheHandler()
event_listener = EventListener() # type: ignore[no-untyped-call]
event_listener = EventListener()
# Determine and set tracing state once for this execution
tracing_enabled = should_enable_tracing(override=self.tracing)
@@ -955,10 +955,20 @@ class Crew(FlowTrackable, BaseModel):
tasks=self.tasks, planning_agent_llm=self.planning_llm
)._handle_crew_planning()
for task, step_plan in zip(
self.tasks, result.list_of_plans_per_task, strict=False
):
task.description += step_plan.plan
plan_map: dict[int, str] = {
step_plan.task_number: step_plan.plan
for step_plan in result.list_of_plans_per_task
}
for idx, task in enumerate(self.tasks):
task_number = idx + 1
if task_number in plan_map:
task.description += plan_map[task_number]
else:
self._logger.log(
"warning",
f"No plan found for Task Number {task_number}",
)
def _store_execution_log(
self,

View File

@@ -15,9 +15,12 @@ logger = logging.getLogger(__name__)
class PlanPerTask(BaseModel):
"""Represents a plan for a specific task."""
task: str = Field(..., description="The task for which the plan is created")
task_number: int = Field(
description="The 1-indexed task number this plan corresponds to",
ge=1,
)
task: str = Field(description="The task for which the plan is created")
plan: str = Field(
...,
description="The step by step plan on how the agents can execute their tasks using the available tools with mastery",
)

View File

@@ -1,7 +1,11 @@
from unittest.mock import patch
"""Tests for the planning handler module."""
from unittest.mock import MagicMock, patch
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
@@ -13,7 +17,7 @@ from crewai.utilities.planning_handler import (
)
class InternalCrewPlanner:
class TestInternalCrewPlanner:
@pytest.fixture
def crew_planner(self):
tasks = [
@@ -49,9 +53,9 @@ class InternalCrewPlanner:
def test_handle_crew_planning(self, crew_planner):
list_of_plans_per_task = [
PlanPerTask(task="Task1", plan="Plan 1"),
PlanPerTask(task="Task2", plan="Plan 2"),
PlanPerTask(task="Task3", plan="Plan 3"),
PlanPerTask(task_number=1, task="Task1", plan="Plan 1"),
PlanPerTask(task_number=2, task="Task2", plan="Plan 2"),
PlanPerTask(task_number=3, task="Task3", plan="Plan 3"),
]
with patch.object(Task, "execute_sync") as execute:
execute.return_value = TaskOutput(
@@ -166,7 +170,9 @@ class InternalCrewPlanner:
description="Description",
agent="agent",
pydantic=PlannerTaskPydanticOutput(
list_of_plans_per_task=[PlanPerTask(task="Task1", plan="Plan 1")]
list_of_plans_per_task=[
PlanPerTask(task_number=1, task="Task1", plan="Plan 1")
]
),
)
result = crew_planner_different_llm._handle_crew_planning()
@@ -177,3 +183,146 @@ class InternalCrewPlanner:
crew_planner_different_llm.tasks
)
execute.assert_called_once()
def test_plan_per_task_requires_task_number(self):
"""Test that PlanPerTask model requires task_number field."""
with pytest.raises(ValueError):
PlanPerTask(task="Task1", plan="Plan 1")
def test_plan_per_task_with_task_number(self):
"""Test PlanPerTask model with task_number field."""
plan = PlanPerTask(task_number=5, task="Task5", plan="Plan for task 5")
assert plan.task_number == 5
assert plan.task == "Task5"
assert plan.plan == "Plan for task 5"
class TestCrewPlanningIntegration:
"""Tests for Crew._handle_crew_planning integration with task_number matching."""
def test_crew_planning_with_out_of_order_plans(self):
"""Test that plans are correctly matched to tasks even when returned out of order.
This test verifies the fix for issue #3953 where plans returned by the LLM
in a different order than the tasks would be incorrectly assigned.
"""
agent1 = Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1")
agent2 = Agent(role="Agent 2", goal="Goal 2", backstory="Backstory 2")
agent3 = Agent(role="Agent 3", goal="Goal 3", backstory="Backstory 3")
task1 = Task(
description="First task description",
expected_output="Output 1",
agent=agent1,
)
task2 = Task(
description="Second task description",
expected_output="Output 2",
agent=agent2,
)
task3 = Task(
description="Third task description",
expected_output="Output 3",
agent=agent3,
)
crew = Crew(
agents=[agent1, agent2, agent3],
tasks=[task1, task2, task3],
planning=True,
)
out_of_order_plans = [
PlanPerTask(task_number=3, task="Task 3", plan=" [PLAN FOR TASK 3]"),
PlanPerTask(task_number=1, task="Task 1", plan=" [PLAN FOR TASK 1]"),
PlanPerTask(task_number=2, task="Task 2", plan=" [PLAN FOR TASK 2]"),
]
mock_planner_result = PlannerTaskPydanticOutput(
list_of_plans_per_task=out_of_order_plans
)
with patch.object(
CrewPlanner, "_handle_crew_planning", return_value=mock_planner_result
):
crew._handle_crew_planning()
assert "[PLAN FOR TASK 1]" in task1.description
assert "[PLAN FOR TASK 2]" in task2.description
assert "[PLAN FOR TASK 3]" in task3.description
assert "[PLAN FOR TASK 3]" not in task1.description
assert "[PLAN FOR TASK 1]" not in task2.description
assert "[PLAN FOR TASK 2]" not in task3.description
def test_crew_planning_with_missing_plan(self):
"""Test that missing plans are handled gracefully with a warning."""
agent1 = Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1")
agent2 = Agent(role="Agent 2", goal="Goal 2", backstory="Backstory 2")
task1 = Task(
description="First task description",
expected_output="Output 1",
agent=agent1,
)
task2 = Task(
description="Second task description",
expected_output="Output 2",
agent=agent2,
)
crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
planning=True,
)
original_task1_desc = task1.description
original_task2_desc = task2.description
incomplete_plans = [
PlanPerTask(task_number=1, task="Task 1", plan=" [PLAN FOR TASK 1]"),
]
mock_planner_result = PlannerTaskPydanticOutput(
list_of_plans_per_task=incomplete_plans
)
with patch.object(
CrewPlanner, "_handle_crew_planning", return_value=mock_planner_result
):
crew._handle_crew_planning()
assert "[PLAN FOR TASK 1]" in task1.description
assert task2.description == original_task2_desc
def test_crew_planning_preserves_original_description(self):
"""Test that planning appends to the original task description."""
agent = Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1")
task = Task(
description="Original task description",
expected_output="Output 1",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
planning=True,
)
plans = [
PlanPerTask(task_number=1, task="Task 1", plan=" - Additional plan steps"),
]
mock_planner_result = PlannerTaskPydanticOutput(list_of_plans_per_task=plans)
with patch.object(
CrewPlanner, "_handle_crew_planning", return_value=mock_planner_result
):
crew._handle_crew_planning()
assert "Original task description" in task.description
assert "Additional plan steps" in task.description