diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index 7e320bf4a..f95a83048 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -158,9 +158,7 @@ class Pipeline(BaseModel): + list(next_pipeline.stages) + self.stages[stage_index + 1 :] ) - traces.append( - [{"router": stage.__class__.__name__, "route_taken": route_taken}] - ) + traces.append([{"route_taken": route_taken}]) stage_index += 1 continue @@ -197,16 +195,6 @@ class Pipeline(BaseModel): else: raise ValueError(f"Unsupported stage type: {type(stage)}") - async def _process_pipeline( - self, pipeline: "Pipeline", current_input: Dict[str, Any] - ) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]: - results = await pipeline.process_single_kickoff(current_input) - outputs = [result.crews_outputs[-1] for result in results] - traces: List[Union[str, Dict[str, Any]]] = [ - f"Nested Pipeline: {pipeline.__class__.__name__}" - ] - return outputs, traces - async def _process_single_crew( self, crew: Crew, current_input: Dict[str, Any] ) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]: diff --git a/src/crewai/routers/router.py b/src/crewai/routers/router.py index e11c816f2..753589e7a 100644 --- a/src/crewai/routers/router.py +++ b/src/crewai/routers/router.py @@ -1,3 +1,4 @@ +import logging from dataclasses import dataclass from typing import Any, Callable, Dict, Generic, Tuple, TypeVar diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index c5141d1ae..579cfaaa3 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -80,7 +80,7 @@ def mock_router_factory(mock_crew_factory): pipeline=Pipeline(stages=[crew1]), ), "route2": Route( - condition=lambda x: x.get("score", 0) > 50, + condition=lambda x: 50 < x.get("score", 0) <= 80, pipeline=Pipeline(stages=[crew2]), ), }, @@ -504,37 +504,39 @@ async def test_pipeline_data_accumulation(mock_crew_factory): @pytest.mark.asyncio async def test_pipeline_with_router(mock_router_factory): router = mock_router_factory() - pipeline = Pipeline(stages=[router]) # Test high score route + pipeline = Pipeline(stages=[router]) result_high = await pipeline.kickoff([{"score": 90}]) assert len(result_high) == 1 assert result_high[0].json_dict is not None assert result_high[0].json_dict["output"] == "crew1" assert result_high[0].trace == [ {"score": 90}, - {"router": "Router", "route_taken": "route1"}, + {"route_taken": "route1"}, "Crew 1", ] # Test medium score route + pipeline = Pipeline(stages=[router]) result_medium = await pipeline.kickoff([{"score": 60}]) assert len(result_medium) == 1 assert result_medium[0].json_dict is not None assert result_medium[0].json_dict["output"] == "crew2" assert result_medium[0].trace == [ {"score": 60}, - {"router": "Router", "route_taken": "route2"}, + {"route_taken": "route2"}, "Crew 2", ] - # Test low score (default) route + # Test low score route + pipeline = Pipeline(stages=[router]) result_low = await pipeline.kickoff([{"score": 30}]) assert len(result_low) == 1 assert result_low[0].json_dict is not None assert result_low[0].json_dict["output"] == "crew3" assert result_low[0].trace == [ {"score": 30}, - {"router": "Router", "route_taken": "default"}, + {"route_taken": "default"}, "Crew 3", ]