In the middle of fixing router. A ton of circular dependencies. Moving over to a new design.

This commit is contained in:
Brandon Hancock
2024-07-26 09:24:40 -04:00
parent d9e60c8b57
commit 31ff979a4b
5 changed files with 144 additions and 18 deletions

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import asyncio
import copy
from typing import Any, Dict, List, Tuple, Union
@@ -7,10 +9,10 @@ from pydantic import BaseModel, Field, model_validator
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline_run_result import PipelineRunResult
from crewai.routers.pipeline_router import PipelineRouter
Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]]
"""
Pipeline Terminology:
Pipeline: The overall structure that defines a sequence of operations.
@@ -44,7 +46,7 @@ Multiple runs can be processed concurrently, each following the defined pipeline
class Pipeline(BaseModel):
stages: List[Union[Crew, List[Crew]]] = Field(
stages: List[Union[Crew, "Pipeline", "PipelineRouter"]] = Field(
..., description="List of crews representing stages to be executed in sequence"
)
@@ -97,9 +99,26 @@ class Pipeline(BaseModel):
all_stage_outputs: List[List[CrewOutput]] = []
traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]]
for stage in self.stages:
stage_index = 0
while stage_index < len(self.stages):
stage = self.stages[stage_index]
stage_input = copy.deepcopy(current_input)
stage_outputs, stage_trace = await self._process_stage(stage, stage_input)
if isinstance(stage, PipelineRouter):
next_stage = stage.route(stage_input)
traces.append([f"Routed to {next_stage.__class__.__name__}"])
stage = next_stage
if isinstance(stage, Crew):
stage_outputs, stage_trace = await self._process_crew(
stage, stage_input
)
elif isinstance(stage, Pipeline):
stage_outputs, stage_trace = await self._process_pipeline(
stage, stage_input
)
else:
raise ValueError(f"Unsupported stage type: {type(stage)}")
self._update_metrics_and_input(
usage_metrics, current_input, stage, stage_outputs
@@ -107,10 +126,28 @@ class Pipeline(BaseModel):
traces.append(stage_trace)
all_stage_outputs.append(stage_outputs)
stage_index += 1
return self._build_pipeline_run_results(
all_stage_outputs, traces, usage_metrics
)
async def _process_crew(
self, crew: Crew, current_input: Dict[str, Any]
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
output = await crew.kickoff_async(inputs=current_input)
return [output], [crew.name or str(crew.id)]
async def _process_pipeline(
self, pipeline: "Pipeline", current_input: Dict[str, Any]
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
results = await pipeline.process_single_run(current_input)
outputs = [result.crews_outputs[-1] for result in results]
traces: List[Union[str, Dict[str, Any]]] = [
f"Nested Pipeline: {pipeline.__class__.__name__}"
]
return outputs, traces
async def _process_stage(
self, stage: Union[Crew, List[Crew]], current_input: Dict[str, Any]
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
@@ -137,11 +174,12 @@ class Pipeline(BaseModel):
self,
usage_metrics: Dict[str, Any],
current_input: Dict[str, Any],
stage: Union[Crew, List[Crew]],
stage: Union[Crew, "Pipeline"],
outputs: List[CrewOutput],
) -> None:
for crew, output in zip([stage] if isinstance(stage, Crew) else stage, outputs):
usage_metrics[crew.name or str(crew.id)] = output.token_usage
for output in outputs:
if isinstance(stage, Crew):
usage_metrics[stage.name or str(stage.id)] = output.token_usage
current_input.update(output.to_dict())
def _build_pipeline_run_results(
@@ -198,12 +236,7 @@ class Pipeline(BaseModel):
return [crew_outputs + [output] for output in all_stage_outputs[-1]]
def __rshift__(self, other: Any) -> "Pipeline":
"""
Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline.
"""
if isinstance(other, Crew):
return type(self)(stages=self.stages + [other])
elif isinstance(other, list) and all(isinstance(crew, Crew) for crew in other):
if isinstance(other, (Crew, Pipeline, PipelineRouter)):
return type(self)(stages=self.stages + [other])
else:
raise TypeError(
@@ -211,8 +244,7 @@ class Pipeline(BaseModel):
)
# Helper function to run the pipeline
async def run_pipeline(
pipeline: Pipeline, inputs: List[Dict[str, Any]]
) -> List[PipelineRunResult]:
return await pipeline.process_runs(inputs)
# TODO: CHECK IF NECESSARY
from crewai.routers.pipeline_router import PipelineRouter
Pipeline.model_rebuild()

View File

@@ -0,0 +1,3 @@
from crewai.routers.pipeline_router import PipelineRouter
__all__ = ["PipelineRouter"]

View File

@@ -0,0 +1,76 @@
from __future__ import annotations
from typing import Any, Callable, Dict, List, Tuple, Union
from pydantic import BaseModel
from crewai.crew import Crew
class PipelineRouter(BaseModel):
conditions: List[
Tuple[Callable[[Dict[str, Any]], bool], Union[Crew, "Pipeline"]]
] = []
default: Union[Crew, "Pipeline", None] = None
def add_condition(
self,
condition: Callable[[Dict[str, Any]], bool],
next_stage: Union[Crew, "Pipeline"],
):
"""
Add a condition and its corresponding next stage to the router.
Args:
condition: A function that takes the input dictionary and returns a boolean.
next_stage: The Crew or Pipeline to execute if the condition is met.
"""
self.conditions.append((condition, next_stage))
def set_default(self, default_stage: Union[Crew, "Pipeline"]):
"""Set the default stage to be executed if no conditions are met."""
self.default = default_stage
def route(self, input_dict: Dict[str, Any]) -> Union[Crew, "Pipeline"]:
"""
Evaluate the input against the conditions and return the appropriate next stage.
Args:
input_dict: The input dictionary to be evaluated.
Returns:
The next Crew or Pipeline to be executed.
Raises:
ValueError: If no conditions are met and no default stage was set.
"""
for condition, next_stage in self.conditions:
if condition(input_dict):
self._update_trace(input_dict, next_stage)
return next_stage
if self.default is not None:
self._update_trace(input_dict, self.default)
return self.default
raise ValueError("No conditions were met and no default stage was set.")
def _update_trace(
self, input_dict: Dict[str, Any], next_stage: Union[Crew, "Pipeline"]
):
"""Update the trace to show that the input went through the router."""
if "trace" not in input_dict:
input_dict["trace"] = []
input_dict["trace"].append(
{
"router": self.__class__.__name__,
"next_stage": next_stage.__class__.__name__,
}
)
# TODO: See if this is necessary
from crewai.pipeline.pipeline import Pipeline
# This line should be at the end of the file
PipelineRouter.model_rebuild()

View File

View File

@@ -8,6 +8,7 @@ from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline import Pipeline
from crewai.pipeline.pipeline_run_result import PipelineRunResult
from crewai.process import Process
from crewai.routers.pipeline_router import PipelineRouter
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from pydantic import BaseModel, ValidationError
@@ -65,6 +66,11 @@ def mock_crew_factory():
return _create_mock_crew
@pytest.fixture
def pipeline_router_factory():
return PipelineRouter()
def test_pipeline_initialization(mock_crew_factory):
"""
Test that a Pipeline is correctly initialized with the given stages.
@@ -443,6 +449,7 @@ Options:
- Should the final output include the accumulation of previous stages' outputs?
"""
@pytest.mark.asyncio
async def test_pipeline_data_accumulation(mock_crew_factory):
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"})
@@ -472,3 +479,11 @@ async def test_pipeline_data_accumulation(mock_crew_factory):
assert len(final_result.crews_outputs) == 2
assert final_result.crews_outputs[0].json_dict == {"key1": "value1"}
assert final_result.crews_outputs[1].json_dict == {"key2": "value2"}
def test_add_condition(pipeline_router_factory, mock_crew_factory):
pipeline_router = pipeline_router_factory()
crew = mock_crew_factory(name="Test Crew")
pipeline_router.add_condition(lambda x: x.get("score", 0) > 80, crew)
assert len(pipeline_router.conditions) == 1
assert pipeline_router.conditions[0][1] == crew