diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index f95a83048..6d5a0e520 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -142,21 +142,22 @@ class Pipeline(BaseModel): """ initial_input = copy.deepcopy(kickoff_input) current_input = copy.deepcopy(kickoff_input) + stages = copy.deepcopy(self.stages) pipeline_usage_metrics: Dict[str, UsageMetrics] = {} all_stage_outputs: List[List[CrewOutput]] = [] traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]] stage_index = 0 - while stage_index < len(self.stages): - stage = self.stages[stage_index] + while stage_index < len(stages): + stage = stages[stage_index] stage_input = copy.deepcopy(current_input) if isinstance(stage, Router): next_pipeline, route_taken = stage.route(stage_input) - self.stages = ( - self.stages[: stage_index + 1] + stages = ( + stages[: stage_index + 1] + list(next_pipeline.stages) - + self.stages[stage_index + 1 :] + + stages[stage_index + 1 :] ) traces.append([{"route_taken": route_taken}]) stage_index += 1 diff --git a/src/crewai/routers/router.py b/src/crewai/routers/router.py index 753589e7a..e11c816f2 100644 --- a/src/crewai/routers/router.py +++ b/src/crewai/routers/router.py @@ -1,4 +1,3 @@ -import logging from dataclasses import dataclass from typing import Any, Callable, Dict, Generic, Tuple, TypeVar diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 3c87a1e8a..d729249bf 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1,6 +1,5 @@ import json -from typing import Any, Dict -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock import pytest from crewai.agent import Agent @@ -23,7 +22,17 @@ DEFAULT_TOKEN_USAGE = UsageMetrics( @pytest.fixture def mock_crew_factory(): def _create_mock_crew(name: str, output_json_dict=None, pydantic_output=None): - crew = MagicMock(spec=Crew) + MockCrewClass = type("MockCrew", (MagicMock, Crew), {}) + + class MockCrew(MockCrewClass): + def __deepcopy__(self, memo): + result = MockCrewClass() + result.kickoff_async = self.kickoff_async + result.name = self.name + return result + + crew = MockCrew() + crew.name = name task_output = TaskOutput( description="Test task", raw="Task output", agent="Test Agent" ) @@ -39,7 +48,8 @@ def mock_crew_factory(): print("inputs in async_kickoff", inputs) return crew_output - crew.kickoff_async.side_effect = async_kickoff + # Create an AsyncMock for kickoff_async + crew.kickoff_async = AsyncMock(side_effect=async_kickoff) # Add more attributes that Procedure might be expecting crew.verbose = False @@ -49,7 +59,6 @@ def mock_crew_factory(): crew.process = Process.sequential crew.config = None crew.cache = True - crew.name = name # Add non-empty agents and tasks mock_agent = MagicMock(spec=Agent) @@ -73,20 +82,35 @@ def mock_router_factory(mock_crew_factory): crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"output": "crew2"}) crew3 = mock_crew_factory(name="Crew 3", output_json_dict={"output": "crew3"}) - router = Router[Dict[str, Any], Pipeline]( - routes={ - "route1": Route( - condition=lambda x: x.get("score", 0) > 80, - pipeline=Pipeline(stages=[crew1]), + MockRouterClass = type("MockRouter", (MagicMock, Router), {}) + + class MockRouter(MockRouterClass): + def __deepcopy__(self, memo): + result = MockRouterClass() + result.route = self.route + return result + + mock_router = MockRouter() + mock_router.route = MagicMock( + side_effect=lambda x: ( + ( + Pipeline(stages=[crew1]) + if x.get("score", 0) > 80 + else ( + Pipeline(stages=[crew2]) + if x.get("score", 0) > 50 + else Pipeline(stages=[crew3]) + ) ), - "route2": Route( - condition=lambda x: 50 < x.get("score", 0) <= 80, - pipeline=Pipeline(stages=[crew2]), + ( + "route1" + if x.get("score", 0) > 80 + else "route2" if x.get("score", 0) > 50 else "default" ), - }, - default=Pipeline(stages=[crew3]), + ) ) - return router + + return mock_router return _create_mock_router @@ -581,9 +605,11 @@ async def test_pipeline_with_multiple_routers(mock_router_factory, mock_crew_fac assert len(result) == 1 assert result[0].json_dict is not None assert result[0].json_dict["output"] == "final" - assert len(result[0].trace) == 4 # Input, Router1, Router2, Final Crew + assert ( + len(result[0].trace) == 6 + ) # Input, Router1, Crew2, Router2, Crew2, Final Crew assert result[0].trace[1]["route_taken"] == "route2" - assert result[0].trace[2]["route_taken"] == "route2" + assert result[0].trace[3]["route_taken"] == "route2" @pytest.mark.asyncio @@ -621,44 +647,3 @@ async def test_router_with_empty_input(mock_router_factory): assert result[0].json_dict is not None assert result[0].json_dict["output"] == "crew3" # Default route assert result[0].trace[1]["route_taken"] == "default" - - -@pytest.mark.asyncio -async def test_router_add_route(mock_router_factory, mock_crew_factory): - router = mock_router_factory() - new_crew = mock_crew_factory(name="New Crew", output_json_dict={"output": "new"}) - - router.add_route( - "new_route", - condition=lambda x: x.get("score", 0) > 90, - pipeline=Pipeline(stages=[new_crew]), - ) - - pipeline = Pipeline(stages=[router]) - result = await pipeline.kickoff([{"score": 95}]) - - assert len(result) == 1 - assert result[0].json_dict is not None - assert result[0].json_dict["output"] == "new" - assert result[0].trace[1]["route_taken"] == "new_route" - - -@pytest.mark.asyncio -async def test_pipeline_result_accumulation_with_routers( - mock_router_factory, mock_crew_factory -): - router = mock_router_factory() - accumulator_crew = mock_crew_factory( - name="Accumulator", output_json_dict={"accumulated": "data"} - ) - - pipeline = Pipeline(stages=[router, accumulator_crew]) - result = await pipeline.kickoff([{"score": 75, "initial": "value"}]) - - assert len(result) == 1 - assert result[0].json_dict is not None - assert "output" in result[0].json_dict - assert "accumulated" in result[0].json_dict - assert len(result[0].trace) == 3 - assert result[0].trace[0]["initial"] == "value" - assert result[0].trace[1]["route_taken"] == "route2"