From 31ff979a4b1f8fe3ec175599c415d63f1378362d Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Fri, 26 Jul 2024 09:24:40 -0400 Subject: [PATCH] In the middle of fixing router. A ton of circular dependencies. Moving over to a new design. --- src/crewai/pipeline/pipeline.py | 68 +++++++++++++++++------- src/crewai/routers/__init__.py | 3 ++ src/crewai/routers/pipeline_router.py | 76 +++++++++++++++++++++++++++ tests/pipeline/__init__.py | 0 tests/pipeline/test_pipeline.py | 15 ++++++ 5 files changed, 144 insertions(+), 18 deletions(-) create mode 100644 src/crewai/routers/__init__.py create mode 100644 src/crewai/routers/pipeline_router.py create mode 100644 tests/pipeline/__init__.py diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index 7dd5f0fa3..2a3165461 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import copy from typing import Any, Dict, List, Tuple, Union @@ -7,10 +9,10 @@ from pydantic import BaseModel, Field, model_validator from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput from crewai.pipeline.pipeline_run_result import PipelineRunResult +from crewai.routers.pipeline_router import PipelineRouter Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]] - """ Pipeline Terminology: Pipeline: The overall structure that defines a sequence of operations. @@ -44,7 +46,7 @@ Multiple runs can be processed concurrently, each following the defined pipeline class Pipeline(BaseModel): - stages: List[Union[Crew, List[Crew]]] = Field( + stages: List[Union[Crew, "Pipeline", "PipelineRouter"]] = Field( ..., description="List of crews representing stages to be executed in sequence" ) @@ -97,9 +99,26 @@ class Pipeline(BaseModel): all_stage_outputs: List[List[CrewOutput]] = [] traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]] - for stage in self.stages: + stage_index = 0 + while stage_index < len(self.stages): + stage = self.stages[stage_index] stage_input = copy.deepcopy(current_input) - stage_outputs, stage_trace = await self._process_stage(stage, stage_input) + + if isinstance(stage, PipelineRouter): + next_stage = stage.route(stage_input) + traces.append([f"Routed to {next_stage.__class__.__name__}"]) + stage = next_stage + + if isinstance(stage, Crew): + stage_outputs, stage_trace = await self._process_crew( + stage, stage_input + ) + elif isinstance(stage, Pipeline): + stage_outputs, stage_trace = await self._process_pipeline( + stage, stage_input + ) + else: + raise ValueError(f"Unsupported stage type: {type(stage)}") self._update_metrics_and_input( usage_metrics, current_input, stage, stage_outputs @@ -107,10 +126,28 @@ class Pipeline(BaseModel): traces.append(stage_trace) all_stage_outputs.append(stage_outputs) + stage_index += 1 + return self._build_pipeline_run_results( all_stage_outputs, traces, usage_metrics ) + async def _process_crew( + self, crew: Crew, current_input: Dict[str, Any] + ) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]: + output = await crew.kickoff_async(inputs=current_input) + return [output], [crew.name or str(crew.id)] + + async def _process_pipeline( + self, pipeline: "Pipeline", current_input: Dict[str, Any] + ) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]: + results = await pipeline.process_single_run(current_input) + outputs = [result.crews_outputs[-1] for result in results] + traces: List[Union[str, Dict[str, Any]]] = [ + f"Nested Pipeline: {pipeline.__class__.__name__}" + ] + return outputs, traces + async def _process_stage( self, stage: Union[Crew, List[Crew]], current_input: Dict[str, Any] ) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]: @@ -137,11 +174,12 @@ class Pipeline(BaseModel): self, usage_metrics: Dict[str, Any], current_input: Dict[str, Any], - stage: Union[Crew, List[Crew]], + stage: Union[Crew, "Pipeline"], outputs: List[CrewOutput], ) -> None: - for crew, output in zip([stage] if isinstance(stage, Crew) else stage, outputs): - usage_metrics[crew.name or str(crew.id)] = output.token_usage + for output in outputs: + if isinstance(stage, Crew): + usage_metrics[stage.name or str(stage.id)] = output.token_usage current_input.update(output.to_dict()) def _build_pipeline_run_results( @@ -198,12 +236,7 @@ class Pipeline(BaseModel): return [crew_outputs + [output] for output in all_stage_outputs[-1]] def __rshift__(self, other: Any) -> "Pipeline": - """ - Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline. - """ - if isinstance(other, Crew): - return type(self)(stages=self.stages + [other]) - elif isinstance(other, list) and all(isinstance(crew, Crew) for crew in other): + if isinstance(other, (Crew, Pipeline, PipelineRouter)): return type(self)(stages=self.stages + [other]) else: raise TypeError( @@ -211,8 +244,7 @@ class Pipeline(BaseModel): ) -# Helper function to run the pipeline -async def run_pipeline( - pipeline: Pipeline, inputs: List[Dict[str, Any]] -) -> List[PipelineRunResult]: - return await pipeline.process_runs(inputs) +# TODO: CHECK IF NECESSARY +from crewai.routers.pipeline_router import PipelineRouter + +Pipeline.model_rebuild() diff --git a/src/crewai/routers/__init__.py b/src/crewai/routers/__init__.py new file mode 100644 index 000000000..a8e0d5f73 --- /dev/null +++ b/src/crewai/routers/__init__.py @@ -0,0 +1,3 @@ +from crewai.routers.pipeline_router import PipelineRouter + +__all__ = ["PipelineRouter"] diff --git a/src/crewai/routers/pipeline_router.py b/src/crewai/routers/pipeline_router.py new file mode 100644 index 000000000..ba39d5285 --- /dev/null +++ b/src/crewai/routers/pipeline_router.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +from typing import Any, Callable, Dict, List, Tuple, Union + +from pydantic import BaseModel + +from crewai.crew import Crew + + +class PipelineRouter(BaseModel): + conditions: List[ + Tuple[Callable[[Dict[str, Any]], bool], Union[Crew, "Pipeline"]] + ] = [] + default: Union[Crew, "Pipeline", None] = None + + def add_condition( + self, + condition: Callable[[Dict[str, Any]], bool], + next_stage: Union[Crew, "Pipeline"], + ): + """ + Add a condition and its corresponding next stage to the router. + + Args: + condition: A function that takes the input dictionary and returns a boolean. + next_stage: The Crew or Pipeline to execute if the condition is met. + """ + self.conditions.append((condition, next_stage)) + + def set_default(self, default_stage: Union[Crew, "Pipeline"]): + """Set the default stage to be executed if no conditions are met.""" + self.default = default_stage + + def route(self, input_dict: Dict[str, Any]) -> Union[Crew, "Pipeline"]: + """ + Evaluate the input against the conditions and return the appropriate next stage. + + Args: + input_dict: The input dictionary to be evaluated. + + Returns: + The next Crew or Pipeline to be executed. + + Raises: + ValueError: If no conditions are met and no default stage was set. + """ + for condition, next_stage in self.conditions: + if condition(input_dict): + self._update_trace(input_dict, next_stage) + return next_stage + + if self.default is not None: + self._update_trace(input_dict, self.default) + return self.default + + raise ValueError("No conditions were met and no default stage was set.") + + def _update_trace( + self, input_dict: Dict[str, Any], next_stage: Union[Crew, "Pipeline"] + ): + """Update the trace to show that the input went through the router.""" + if "trace" not in input_dict: + input_dict["trace"] = [] + input_dict["trace"].append( + { + "router": self.__class__.__name__, + "next_stage": next_stage.__class__.__name__, + } + ) + + +# TODO: See if this is necessary +from crewai.pipeline.pipeline import Pipeline + +# This line should be at the end of the file +PipelineRouter.model_rebuild() diff --git a/tests/pipeline/__init__.py b/tests/pipeline/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index e7b7a745c..6dc39672a 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -8,6 +8,7 @@ from crewai.crews.crew_output import CrewOutput from crewai.pipeline.pipeline import Pipeline from crewai.pipeline.pipeline_run_result import PipelineRunResult from crewai.process import Process +from crewai.routers.pipeline_router import PipelineRouter from crewai.task import Task from crewai.tasks.task_output import TaskOutput from pydantic import BaseModel, ValidationError @@ -65,6 +66,11 @@ def mock_crew_factory(): return _create_mock_crew +@pytest.fixture +def pipeline_router_factory(): + return PipelineRouter() + + def test_pipeline_initialization(mock_crew_factory): """ Test that a Pipeline is correctly initialized with the given stages. @@ -443,6 +449,7 @@ Options: - Should the final output include the accumulation of previous stages' outputs? """ + @pytest.mark.asyncio async def test_pipeline_data_accumulation(mock_crew_factory): crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"}) @@ -472,3 +479,11 @@ async def test_pipeline_data_accumulation(mock_crew_factory): assert len(final_result.crews_outputs) == 2 assert final_result.crews_outputs[0].json_dict == {"key1": "value1"} assert final_result.crews_outputs[1].json_dict == {"key2": "value2"} + + +def test_add_condition(pipeline_router_factory, mock_crew_factory): + pipeline_router = pipeline_router_factory() + crew = mock_crew_factory(name="Test Crew") + pipeline_router.add_condition(lambda x: x.get("score", 0) > 80, crew) + assert len(pipeline_router.conditions) == 1 + assert pipeline_router.conditions[0][1] == crew