remove all references to pipeline and pipeline router (#1661)

* remove all references to pipeline and router

* fix linting

* drop poetry.lock
This commit is contained in:
Brandon Hancock (bhancock_ai)
2024-12-04 12:39:34 -05:00
committed by GitHub
parent 066ad73423
commit bbea797b0c
57 changed files with 10 additions and 10079 deletions

View File

@@ -8,12 +8,10 @@ from .annotations import (
llm,
output_json,
output_pydantic,
pipeline,
task,
tool,
)
from .crew_base import CrewBase
from .pipeline_base import PipelineBase
__all__ = [
"agent",
@@ -24,10 +22,8 @@ __all__ = [
"tool",
"callback",
"CrewBase",
"PipelineBase",
"llm",
"cache_handler",
"pipeline",
"before_kickoff",
"after_kickoff",
]

View File

@@ -65,21 +65,6 @@ def cache_handler(func):
return memoize(func)
def stage(func):
func.is_stage = True
return memoize(func)
def router(func):
func.is_router = True
return memoize(func)
def pipeline(func):
func.is_pipeline = True
return memoize(func)
def crew(func) -> Callable[..., Crew]:
def wrapper(self, *args, **kwargs) -> Crew:
instantiated_tasks = []

View File

@@ -1,58 +0,0 @@
from typing import Any, Callable, Dict, List, Type, Union
from crewai.crew import Crew
from crewai.pipeline.pipeline import Pipeline
from crewai.routers.router import Router
PipelineStage = Union[Crew, List[Crew], Router]
# TODO: Could potentially remove. Need to check with @joao and @gui if this is needed for CrewAI+
def PipelineBase(cls: Type[Any]) -> Type[Any]:
class WrappedClass(cls):
is_pipeline_class: bool = True # type: ignore
stages: List[PipelineStage]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.stages = []
self._map_pipeline_components()
def _get_all_functions(self) -> Dict[str, Callable[..., Any]]:
return {
name: getattr(self, name)
for name in dir(self)
if callable(getattr(self, name))
}
def _filter_functions(
self, functions: Dict[str, Callable[..., Any]], attribute: str
) -> Dict[str, Callable[..., Any]]:
return {
name: func
for name, func in functions.items()
if hasattr(func, attribute)
}
def _map_pipeline_components(self) -> None:
all_functions = self._get_all_functions()
crew_functions = self._filter_functions(all_functions, "is_crew")
router_functions = self._filter_functions(all_functions, "is_router")
for stage_attr in dir(self):
stage = getattr(self, stage_attr)
if isinstance(stage, (Crew, Router)):
self.stages.append(stage)
elif callable(stage) and hasattr(stage, "is_crew"):
self.stages.append(crew_functions[stage_attr]())
elif callable(stage) and hasattr(stage, "is_router"):
self.stages.append(router_functions[stage_attr]())
elif isinstance(stage, list) and all(
isinstance(item, Crew) for item in stage
):
self.stages.append(stage)
def build_pipeline(self) -> Pipeline:
return Pipeline(stages=self.stages)
return WrappedClass