This commit is contained in:
Brandon Hancock
2024-07-30 09:36:04 -04:00
parent d72f5a601e
commit 46fdada5e4
6 changed files with 32 additions and 18 deletions

View File

@@ -1,3 +1,5 @@
from crewai.pipeline.pipeline import Pipeline from crewai.pipeline.pipeline import Pipeline
from crewai.pipeline.pipeline_output import PipelineOutput
from crewai.pipeline.pipeline_run_result import PipelineRunResult
__all__ = ["Pipeline"] __all__ = ["Pipeline", "PipelineOutput", "PipelineRunResult"]

View File

@@ -2,19 +2,17 @@ from __future__ import annotations
import asyncio import asyncio
import copy import copy
from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Union from typing import Any, Dict, List, Tuple, Union
from pydantic import BaseModel, Field, model_validator from pydantic import BaseModel, Field, model_validator
from crewai.crew import Crew from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline_run_result import PipelineRunResult from crewai.pipeline.pipeline_run_result import PipelineRunResult
from crewai.routers.pipeline_router import PipelineRouter
from crewai.types.pipeline_stage import PipelineStage from crewai.types.pipeline_stage import PipelineStage
from crewai.types.usage_metrics import UsageMetrics from crewai.types.usage_metrics import UsageMetrics
if TYPE_CHECKING:
from crewai.routers.pipeline_router import PipelineRouter
Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]] Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]]
@@ -54,7 +52,7 @@ Handling Traces and Crew Outputs:
Pipeline Terminology: Pipeline Terminology:
- Pipeline: The overall structure that defines a sequence of operations. - Pipeline: The overall structure that defines a sequence of operations.
- Stage: A distinct part of the pipeline, which can be either sequential or parallel. - Stage: A distinct part of the pipeline, which can be either sequential or parallel.
- Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline. - Kickoff: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
- Branch: Parallel executions within a stage (e.g., concurrent crew operations). - Branch: Parallel executions within a stage (e.g., concurrent crew operations).
- Trace: The journey of an individual input through the entire pipeline. - Trace: The journey of an individual input through the entire pipeline.
@@ -408,3 +406,6 @@ class Pipeline(BaseModel):
raise TypeError( raise TypeError(
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'" f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
) )
Pipeline.model_rebuild()

View File

@@ -8,7 +8,7 @@ from crewai.crews.crew_output import CrewOutput
from crewai.types.usage_metrics import UsageMetrics from crewai.types.usage_metrics import UsageMetrics
class PipelineRunResult(BaseModel): class PipelineKickoffResult(BaseModel):
"""Class that represents the result of a pipeline run.""" """Class that represents the result of a pipeline run."""
id: UUID4 = Field( id: UUID4 = Field(

View File

@@ -3,20 +3,21 @@ from typing import Any, Callable, Dict, Tuple, Union
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from crewai.pipeline.pipeline import Pipeline from crewai.pipeline.pipeline import Pipeline
from crewai.types.route import Route
RouteType = Tuple[Callable[[Dict[str, Any]], bool], Pipeline]
class PipelineRouter(BaseModel): class PipelineRouter(BaseModel):
routes: Dict[str, RouteType] = Field( routes: Dict[str, Route] = Field(
default_factory=dict, default_factory=dict,
description="Dictionary of route names to (condition, pipeline) tuples", description="Dictionary of route names to (condition, pipeline) tuples",
) )
default: Pipeline = Field( default: "Pipeline" = Field(
..., description="Default pipeline if no conditions are met" ..., description="Default pipeline if no conditions are met"
) )
def __init__(self, *routes: Union[Tuple[str, RouteType], Pipeline], **data): def __init__(self, *routes: Union[Tuple[str, Route], "Pipeline"], **data):
from crewai.pipeline.pipeline import Pipeline
routes_dict = {} routes_dict = {}
default_pipeline = None default_pipeline = None
@@ -41,7 +42,10 @@ class PipelineRouter(BaseModel):
super().__init__(routes=routes_dict, default=default_pipeline, **data) super().__init__(routes=routes_dict, default=default_pipeline, **data)
def add_route( def add_route(
self, name: str, condition: Callable[[Dict[str, Any]], bool], pipeline: Pipeline self,
name: str,
condition: Callable[[Dict[str, Any]], bool],
pipeline: "Pipeline",
) -> "PipelineRouter": ) -> "PipelineRouter":
""" """
Add a named route with its condition and corresponding pipeline to the router. Add a named route with its condition and corresponding pipeline to the router.
@@ -57,7 +61,7 @@ class PipelineRouter(BaseModel):
self.routes[name] = (condition, pipeline) self.routes[name] = (condition, pipeline)
return self return self
def route(self, input_dict: Dict[str, Any]) -> Tuple[Pipeline, str]: def route(self, input_dict: Dict[str, Any]) -> Tuple["Pipeline", str]:
""" """
Evaluate the input against the conditions and return the appropriate pipeline. Evaluate the input against the conditions and return the appropriate pipeline.
@@ -72,3 +76,6 @@ class PipelineRouter(BaseModel):
return pipeline, name return pipeline, name
return self.default, "default" return self.default, "default"
PipelineRouter.model_rebuild()

View File

@@ -0,0 +1,5 @@
from typing import Any, Callable, Dict, Tuple
from crewai.pipeline.pipeline import Pipeline
Route = Tuple[Callable[[Dict[str, Any]], bool], Pipeline]

View File

@@ -8,7 +8,6 @@ from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline import Pipeline from crewai.pipeline.pipeline import Pipeline
from crewai.pipeline.pipeline_run_result import PipelineRunResult from crewai.pipeline.pipeline_run_result import PipelineRunResult
from crewai.process import Process from crewai.process import Process
from crewai.routers.pipeline_router import PipelineRouter
from crewai.task import Task from crewai.task import Task
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
from crewai.types.usage_metrics import UsageMetrics from crewai.types.usage_metrics import UsageMetrics
@@ -65,9 +64,9 @@ def mock_crew_factory():
return _create_mock_crew return _create_mock_crew
@pytest.fixture # @pytest.fixture
def pipeline_router_factory(): # def pipeline_router_factory():
return PipelineRouter() # return PipelineRouter()
def test_pipeline_initialization(mock_crew_factory): def test_pipeline_initialization(mock_crew_factory):