From 46fdada5e4e772f14b54d1a1e1ab14dca0e596db Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Tue, 30 Jul 2024 09:36:04 -0400 Subject: [PATCH] WIP. --- src/crewai/pipeline/__init__.py | 4 +++- src/crewai/pipeline/pipeline.py | 11 ++++++----- src/crewai/pipeline/pipeline_run_result.py | 2 +- src/crewai/routers/pipeline_router.py | 21 ++++++++++++++------- src/crewai/types/route.py | 5 +++++ tests/pipeline/test_pipeline.py | 7 +++---- 6 files changed, 32 insertions(+), 18 deletions(-) create mode 100644 src/crewai/types/route.py diff --git a/src/crewai/pipeline/__init__.py b/src/crewai/pipeline/__init__.py index 88933b0d9..d129ecee1 100644 --- a/src/crewai/pipeline/__init__.py +++ b/src/crewai/pipeline/__init__.py @@ -1,3 +1,5 @@ from crewai.pipeline.pipeline import Pipeline +from crewai.pipeline.pipeline_output import PipelineOutput +from crewai.pipeline.pipeline_run_result import PipelineRunResult -__all__ = ["Pipeline"] +__all__ = ["Pipeline", "PipelineOutput", "PipelineRunResult"] diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index 816db0e66..9819fd2e6 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -2,19 +2,17 @@ from __future__ import annotations import asyncio import copy -from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Union +from typing import Any, Dict, List, Tuple, Union from pydantic import BaseModel, Field, model_validator from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput from crewai.pipeline.pipeline_run_result import PipelineRunResult +from crewai.routers.pipeline_router import PipelineRouter from crewai.types.pipeline_stage import PipelineStage from crewai.types.usage_metrics import UsageMetrics -if TYPE_CHECKING: - from crewai.routers.pipeline_router import PipelineRouter - Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]] @@ -54,7 +52,7 @@ Handling Traces and Crew Outputs: Pipeline Terminology: - Pipeline: The overall structure that defines a sequence of operations. - Stage: A distinct part of the pipeline, which can be either sequential or parallel. -- Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline. +- Kickoff: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline. - Branch: Parallel executions within a stage (e.g., concurrent crew operations). - Trace: The journey of an individual input through the entire pipeline. @@ -408,3 +406,6 @@ class Pipeline(BaseModel): raise TypeError( f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'" ) + + +Pipeline.model_rebuild() diff --git a/src/crewai/pipeline/pipeline_run_result.py b/src/crewai/pipeline/pipeline_run_result.py index 576cd9949..7bde238cd 100644 --- a/src/crewai/pipeline/pipeline_run_result.py +++ b/src/crewai/pipeline/pipeline_run_result.py @@ -8,7 +8,7 @@ from crewai.crews.crew_output import CrewOutput from crewai.types.usage_metrics import UsageMetrics -class PipelineRunResult(BaseModel): +class PipelineKickoffResult(BaseModel): """Class that represents the result of a pipeline run.""" id: UUID4 = Field( diff --git a/src/crewai/routers/pipeline_router.py b/src/crewai/routers/pipeline_router.py index d4b244dba..2dc82375a 100644 --- a/src/crewai/routers/pipeline_router.py +++ b/src/crewai/routers/pipeline_router.py @@ -3,20 +3,21 @@ from typing import Any, Callable, Dict, Tuple, Union from pydantic import BaseModel, Field from crewai.pipeline.pipeline import Pipeline - -RouteType = Tuple[Callable[[Dict[str, Any]], bool], Pipeline] +from crewai.types.route import Route class PipelineRouter(BaseModel): - routes: Dict[str, RouteType] = Field( + routes: Dict[str, Route] = Field( default_factory=dict, description="Dictionary of route names to (condition, pipeline) tuples", ) - default: Pipeline = Field( + default: "Pipeline" = Field( ..., description="Default pipeline if no conditions are met" ) - def __init__(self, *routes: Union[Tuple[str, RouteType], Pipeline], **data): + def __init__(self, *routes: Union[Tuple[str, Route], "Pipeline"], **data): + from crewai.pipeline.pipeline import Pipeline + routes_dict = {} default_pipeline = None @@ -41,7 +42,10 @@ class PipelineRouter(BaseModel): super().__init__(routes=routes_dict, default=default_pipeline, **data) def add_route( - self, name: str, condition: Callable[[Dict[str, Any]], bool], pipeline: Pipeline + self, + name: str, + condition: Callable[[Dict[str, Any]], bool], + pipeline: "Pipeline", ) -> "PipelineRouter": """ Add a named route with its condition and corresponding pipeline to the router. @@ -57,7 +61,7 @@ class PipelineRouter(BaseModel): self.routes[name] = (condition, pipeline) return self - def route(self, input_dict: Dict[str, Any]) -> Tuple[Pipeline, str]: + def route(self, input_dict: Dict[str, Any]) -> Tuple["Pipeline", str]: """ Evaluate the input against the conditions and return the appropriate pipeline. @@ -72,3 +76,6 @@ class PipelineRouter(BaseModel): return pipeline, name return self.default, "default" + + +PipelineRouter.model_rebuild() diff --git a/src/crewai/types/route.py b/src/crewai/types/route.py new file mode 100644 index 000000000..9a97acc52 --- /dev/null +++ b/src/crewai/types/route.py @@ -0,0 +1,5 @@ +from typing import Any, Callable, Dict, Tuple + +from crewai.pipeline.pipeline import Pipeline + +Route = Tuple[Callable[[Dict[str, Any]], bool], Pipeline] diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 9b6ed134f..34d1fdb90 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -8,7 +8,6 @@ from crewai.crews.crew_output import CrewOutput from crewai.pipeline.pipeline import Pipeline from crewai.pipeline.pipeline_run_result import PipelineRunResult from crewai.process import Process -from crewai.routers.pipeline_router import PipelineRouter from crewai.task import Task from crewai.tasks.task_output import TaskOutput from crewai.types.usage_metrics import UsageMetrics @@ -65,9 +64,9 @@ def mock_crew_factory(): return _create_mock_crew -@pytest.fixture -def pipeline_router_factory(): - return PipelineRouter() +# @pytest.fixture +# def pipeline_router_factory(): +# return PipelineRouter() def test_pipeline_initialization(mock_crew_factory):