diff --git a/src/crewai/cli/create_pipeline.py b/src/crewai/cli/create_pipeline.py index 7b555ca20..59936fbc7 100644 --- a/src/crewai/cli/create_pipeline.py +++ b/src/crewai/cli/create_pipeline.py @@ -21,7 +21,6 @@ def create_pipeline(name, router=False): (project_root / "src" / folder_name).mkdir(parents=True) (project_root / "src" / folder_name / "crews").mkdir(parents=True) (project_root / "src" / folder_name / "tools").mkdir(parents=True) - (project_root / "src" / folder_name / "config").mkdir(parents=True) (project_root / "tests").mkdir(exist_ok=True) # Create .env file @@ -35,12 +34,8 @@ def create_pipeline(name, router=False): # List of template files to copy root_template_files = [".gitignore", "pyproject.toml", "README.md"] src_template_files = ["__init__.py", "main.py", "pipeline.py"] - tools_template_files = ["tools/custom_tool.py", "tools/__init__.py"] - config_template_files = ["config/agents.yaml", "config/tasks.yaml"] - crew_template_files = ["crews/research_crew.py", "crews/write_x_crew.py"] - - if router: - crew_template_files.append("crews/write_linkedin_crew.py") + tools_template_files = ["tools/__init__.py", "tools/custom_tool.py"] + crew_folders = ["research_crew", "write_x_crew", "write_linkedin_crew"] def process_file(src_file, dst_file): with open(src_file, "r") as file: @@ -66,16 +61,22 @@ def create_pipeline(name, router=False): dst_file = project_root / "src" / folder_name / file_name process_file(src_file, dst_file) - # Copy tools and config files - for file_name in tools_template_files + config_template_files: + # Copy tools files + for file_name in tools_template_files: src_file = templates_dir / file_name dst_file = project_root / "src" / folder_name / file_name shutil.copy(src_file, dst_file) - # Copy and process crew files - for file_name in crew_template_files: - src_file = templates_dir / file_name - dst_file = project_root / "src" / folder_name / file_name - process_file(src_file, dst_file) + # Copy crew folders + for crew_folder in crew_folders: + src_crew_folder = templates_dir / "crews" / crew_folder + dst_crew_folder = project_root / "src" / folder_name / "crews" / crew_folder + if src_crew_folder.exists(): + shutil.copytree(src_crew_folder, dst_crew_folder) + else: + click.secho( + f"Warning: Crew folder {crew_folder} not found in template.", + fg="yellow", + ) click.secho(f"Pipeline {name} created successfully!", fg="green", bold=True) diff --git a/src/crewai/cli/templates/pipeline/crews/research_crew/config/agents.yaml b/src/crewai/cli/templates/pipeline/crews/research_crew/config/agents.yaml new file mode 100644 index 000000000..f8cf1f5c1 --- /dev/null +++ b/src/crewai/cli/templates/pipeline/crews/research_crew/config/agents.yaml @@ -0,0 +1,19 @@ +researcher: + role: > + {topic} Senior Data Researcher + goal: > + Uncover cutting-edge developments in {topic} + backstory: > + You're a seasoned researcher with a knack for uncovering the latest + developments in {topic}. Known for your ability to find the most relevant + information and present it in a clear and concise manner. + +reporting_analyst: + role: > + {topic} Reporting Analyst + goal: > + Create detailed reports based on {topic} data analysis and research findings + backstory: > + You're a meticulous analyst with a keen eye for detail. You're known for + your ability to turn complex data into clear and concise reports, making + it easy for others to understand and act on the information you provide. diff --git a/src/crewai/cli/templates/pipeline/crews/research_crew/config/tasks.yaml b/src/crewai/cli/templates/pipeline/crews/research_crew/config/tasks.yaml new file mode 100644 index 000000000..e78091842 --- /dev/null +++ b/src/crewai/cli/templates/pipeline/crews/research_crew/config/tasks.yaml @@ -0,0 +1,16 @@ +research_task: + description: > + Conduct a thorough research about {topic} + Make sure you find any interesting and relevant information given + the current year is 2024. + expected_output: > + A list with 10 bullet points of the most relevant information about {topic} + agent: researcher + +reporting_task: + description: > + Review the context you got and expand each topic into a full section for a report. + Make sure the report is detailed and contains any and all relevant information. + expected_output: > + A fully fledge reports with a title, mains topics, each with a full section of information. + agent: reporting_analyst diff --git a/src/crewai/cli/templates/pipeline/crews/research_crew.py b/src/crewai/cli/templates/pipeline/crews/research_crew/research_crew.py similarity index 95% rename from src/crewai/cli/templates/pipeline/crews/research_crew.py rename to src/crewai/cli/templates/pipeline/crews/research_crew/research_crew.py index b241fc269..e20082188 100644 --- a/src/crewai/cli/templates/pipeline/crews/research_crew.py +++ b/src/crewai/cli/templates/pipeline/crews/research_crew/research_crew.py @@ -3,7 +3,7 @@ from crewai import Agent, Crew, Process, Task from crewai.project import CrewBase, agent, crew, task # Uncomment the following line to use an example of a custom tool -# from {{folder_name}}.tools.custom_tool import MyCustomTool +# from demo_pipeline.tools.custom_tool import MyCustomTool # Check our tools documentations for more information on how to use them # from crewai_tools import SerperDevTool diff --git a/src/crewai/cli/templates/pipeline/crews/write_linkedin_crew/config/agents.yaml b/src/crewai/cli/templates/pipeline/crews/write_linkedin_crew/config/agents.yaml new file mode 100644 index 000000000..e69de29bb diff --git a/src/crewai/cli/templates/pipeline/crews/write_linkedin_crew/config/tasks.yaml b/src/crewai/cli/templates/pipeline/crews/write_linkedin_crew/config/tasks.yaml new file mode 100644 index 000000000..e69de29bb diff --git a/src/crewai/cli/templates/pipeline/crews/write_linkedin_crew.py b/src/crewai/cli/templates/pipeline/crews/write_linkedin_crew/write_linkedin_crew.py similarity index 100% rename from src/crewai/cli/templates/pipeline/crews/write_linkedin_crew.py rename to src/crewai/cli/templates/pipeline/crews/write_linkedin_crew/write_linkedin_crew.py diff --git a/src/crewai/cli/templates/pipeline/crews/write_x_crew.py b/src/crewai/cli/templates/pipeline/crews/write_x_crew.py deleted file mode 100644 index 933417c66..000000000 --- a/src/crewai/cli/templates/pipeline/crews/write_x_crew.py +++ /dev/null @@ -1,37 +0,0 @@ -from crewai import Agent, Crew, Process, Task -from crewai.project import CrewBase, agent, crew, task - -# Uncomment the following line to use an example of a custom tool -# from {{folder_name}}.tools.custom_tool import MyCustomTool - -# Check our tools documentations for more information on how to use them -# from crewai_tools import SerperDevTool - -@CrewBase -class WriteXCrew(): - """Research Crew""" - agents_config = 'config/agents.yaml' - tasks_config = 'config/tasks.yaml' - - @agent - def x_writer_agent(self) -> Agent: - return Agent( - config=self.agents_config['x_writer_agent'], - verbose=True - ) - - @task - def write_x_task(self) -> Task: - return Task( - config=self.tasks_config['write_x_task'], - ) - - @crew - def crew(self) -> Crew: - """Creates the Write X Crew""" - return Crew( - agents=self.agents, # Automatically created by the @agent decorator - tasks=self.tasks, # Automatically created by the @task decorator - process=Process.sequential, - verbose=2, - ) \ No newline at end of file diff --git a/src/crewai/cli/templates/pipeline/config/agents.yaml b/src/crewai/cli/templates/pipeline/crews/write_x_crew/config/agents.yaml similarity index 54% rename from src/crewai/cli/templates/pipeline/config/agents.yaml rename to src/crewai/cli/templates/pipeline/crews/write_x_crew/config/agents.yaml index f5ef4d818..1401dcbe0 100644 --- a/src/crewai/cli/templates/pipeline/config/agents.yaml +++ b/src/crewai/cli/templates/pipeline/crews/write_x_crew/config/agents.yaml @@ -1,23 +1,3 @@ -researcher: - role: > - {topic} Senior Data Researcher - goal: > - Uncover cutting-edge developments in {topic} - backstory: > - You're a seasoned researcher with a knack for uncovering the latest - developments in {topic}. Known for your ability to find the most relevant - information and present it in a clear and concise manner. - -reporting_analyst: - role: > - {topic} Reporting Analyst - goal: > - Create detailed reports based on {topic} data analysis and research findings - backstory: > - You're a meticulous analyst with a keen eye for detail. You're known for - your ability to turn complex data into clear and concise reports, making - it easy for others to understand and act on the information you provide. - x_writer_agent: role: > Expert Social Media Content Creator specializing in short form written content diff --git a/src/crewai/cli/templates/pipeline/config/tasks.yaml b/src/crewai/cli/templates/pipeline/crews/write_x_crew/config/tasks.yaml similarity index 58% rename from src/crewai/cli/templates/pipeline/config/tasks.yaml rename to src/crewai/cli/templates/pipeline/crews/write_x_crew/config/tasks.yaml index c81d41db5..1ffbc207a 100644 --- a/src/crewai/cli/templates/pipeline/config/tasks.yaml +++ b/src/crewai/cli/templates/pipeline/crews/write_x_crew/config/tasks.yaml @@ -1,20 +1,3 @@ -research_task: - description: > - Conduct a thorough research about {topic} - Make sure you find any interesting and relevant information given - the current year is 2024. - expected_output: > - A list with 10 bullet points of the most relevant information about {topic} - agent: researcher - -reporting_task: - description: > - Review the context you got and expand each topic into a full section for a report. - Make sure the report is detailed and contains any and all relevant information. - expected_output: > - A fully fledge reports with a title, mains topics, each with a full section of information. - agent: reporting_analyst - write_x_task: description: > Using the research report provided, create an engaging short form post about {topic}. @@ -31,7 +14,7 @@ write_x_task: Title: {title} Research: - {research} + {body} expected_output: > A compelling X post under 280 characters that effectively summarizes the key findings diff --git a/src/crewai/cli/templates/pipeline/crews/write_x_crew/write_x_crew.py b/src/crewai/cli/templates/pipeline/crews/write_x_crew/write_x_crew.py new file mode 100644 index 000000000..7fc13d0e5 --- /dev/null +++ b/src/crewai/cli/templates/pipeline/crews/write_x_crew/write_x_crew.py @@ -0,0 +1,36 @@ +from crewai import Agent, Crew, Process, Task +from crewai.project import CrewBase, agent, crew, task + +# Uncomment the following line to use an example of a custom tool +# from demo_pipeline.tools.custom_tool import MyCustomTool + +# Check our tools documentations for more information on how to use them +# from crewai_tools import SerperDevTool + + +@CrewBase +class WriteXCrew: + """Research Crew""" + + agents_config = "config/agents.yaml" + tasks_config = "config/tasks.yaml" + + @agent + def x_writer_agent(self) -> Agent: + return Agent(config=self.agents_config["x_writer_agent"], verbose=True) + + @task + def write_x_task(self) -> Task: + return Task( + config=self.tasks_config["write_x_task"], + ) + + @crew + def crew(self) -> Crew: + """Creates the Write X Crew""" + return Crew( + agents=self.agents, # Automatically created by the @agent decorator + tasks=self.tasks, # Automatically created by the @task decorator + process=Process.sequential, + verbose=2, + ) diff --git a/src/crewai/cli/templates/pipeline/main.py b/src/crewai/cli/templates/pipeline/main.py index 829008a0b..9a4505619 100644 --- a/src/crewai/cli/templates/pipeline/main.py +++ b/src/crewai/cli/templates/pipeline/main.py @@ -9,7 +9,7 @@ async def run(): inputs = [ {"topic": "AI wearables"}, ] - pipeline = {{pipeline_name}}Pipeline().pipeline() + pipeline = {{pipeline_name}}Pipeline() results = await pipeline.kickoff(inputs) # Process and print results diff --git a/src/crewai/cli/templates/pipeline/pipeline.py b/src/crewai/cli/templates/pipeline/pipeline.py index 1fc4a7938..42b9ead00 100644 --- a/src/crewai/cli/templates/pipeline/pipeline.py +++ b/src/crewai/cli/templates/pipeline/pipeline.py @@ -11,7 +11,7 @@ Key features: - The ResearchCrew's final task uses output_json to store all research findings in a JSON object. - This JSON object is then passed to the WriteXCrew, where tasks can access the research findings. -Example 2: Three-Stage Pipeline with Parallel Execution +Example 2: Two-Stage Pipeline with Parallel Execution ------------------------------------------------------- This pipeline consists of three crews: 1. ResearchCrew: Performs research on a given topic. @@ -28,28 +28,24 @@ Usage: # Common imports for both examples from crewai import Pipeline -from crewai.project.pipeline_base import PipelineBase -from crewai.project.annotations import pipeline # Uncomment the crews you need for your chosen example -from .crews.research_crew import ResearchCrew -from .crews.write_x_crew import WriteXCrew +from .crews.research_crew.research_crew import ResearchCrew +from .crews.write_x_crew.write_x_crew import WriteXCrew # from .crews.write_linkedin_crew import WriteLinkedInCrew # Uncomment for Example 2 # EXAMPLE 1: Two-Stage Pipeline # ----------------------------- # Uncomment the following code block to use Example 1 -@PipelineBase class {{pipeline_name}}Pipeline: def __init__(self): # Initialize crews self.research_crew = ResearchCrew().crew() self.write_x_crew = WriteXCrew().crew() - @pipeline def create_pipeline(self): return Pipeline( stages=[ @@ -58,7 +54,7 @@ class {{pipeline_name}}Pipeline: ] ) - async def run(self, inputs): + async def kickoff(self, inputs): pipeline = self.create_pipeline() results = await pipeline.kickoff(inputs) return results diff --git a/src/crewai/cli/utils.py b/src/crewai/cli/utils.py index 3077cf6ec..2cb181fc4 100644 --- a/src/crewai/cli/utils.py +++ b/src/crewai/cli/utils.py @@ -2,8 +2,6 @@ import click def copy_template(src, dst, name, class_name, folder_name): - print(f"Copying {src} to {dst}") - print(f"Interpolating {name}, {class_name}, {folder_name}") """Copy a file from src to dst.""" with open(src, "r") as file: content = file.read() diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index 2801a1834..11dc6a62d 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -142,7 +142,7 @@ class Pipeline(BaseModel): """ initial_input = copy.deepcopy(kickoff_input) current_input = copy.deepcopy(kickoff_input) - stages = copy.deepcopy(self.stages) + stages = self._copy_stages() pipeline_usage_metrics: Dict[str, UsageMetrics] = {} all_stage_outputs: List[List[CrewOutput]] = [] traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]] @@ -151,6 +151,7 @@ class Pipeline(BaseModel): while stage_index < len(stages): stage = stages[stage_index] stage_input = copy.deepcopy(current_input) + print("stage_input", stage_input) if isinstance(stage, Router): next_pipeline, route_taken = stage.route(stage_input) @@ -164,6 +165,7 @@ class Pipeline(BaseModel): continue stage_outputs, stage_trace = await self._process_stage(stage, stage_input) + print("stage_outputs", stage_outputs) self._update_metrics_and_input( pipeline_usage_metrics, current_input, stage, stage_outputs @@ -210,6 +212,8 @@ class Pipeline(BaseModel): Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]: The output and trace of the crew. """ output = await crew.kickoff_async(inputs=current_input) + print("output from crew kickoff", output) + print("output from crew kickoff dict", output.to_dict()) return [output], [crew.name or str(crew.id)] async def _process_parallel_crews( @@ -367,6 +371,24 @@ class Pipeline(BaseModel): ] return [crew_outputs + [output] for output in all_stage_outputs[-1]] + def _copy_stages(self): + """Create a deep copy of the Pipeline's stages.""" + new_stages = [] + for stage in self.stages: + if isinstance(stage, list): + new_stages.append( + [ + crew.copy() if hasattr(crew, "copy") else copy.deepcopy(crew) + for crew in stage + ] + ) + elif hasattr(stage, "copy"): + new_stages.append(stage.copy()) + else: + new_stages.append(copy.deepcopy(stage)) + + return new_stages + def __rshift__(self, other: PipelineStage) -> "Pipeline": """ Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline. diff --git a/src/crewai/project/crew_base.py b/src/crewai/project/crew_base.py index 0085e2eb4..a2c104fe4 100644 --- a/src/crewai/project/crew_base.py +++ b/src/crewai/project/crew_base.py @@ -24,7 +24,9 @@ def CrewBase(cls): original_agents_config_path = getattr( cls, "agents_config", "config/agents.yaml" ) + print("Original agents config path: ", original_agents_config_path) original_tasks_config_path = getattr(cls, "tasks_config", "config/tasks.yaml") + print("Original tasks config path: ", original_tasks_config_path) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -39,9 +41,11 @@ def CrewBase(cls): self.agents_config = self.load_yaml( os.path.join(self.base_directory, self.original_agents_config_path) ) + print("Agents config: ", self.agents_config) self.tasks_config = self.load_yaml( os.path.join(self.base_directory, self.original_tasks_config_path) ) + print("Task config: ", self.tasks_config) self.map_all_agent_variables() self.map_all_task_variables() diff --git a/src/crewai/project/pipeline_base.py b/src/crewai/project/pipeline_base.py index 39dc1936d..fd109be3b 100644 --- a/src/crewai/project/pipeline_base.py +++ b/src/crewai/project/pipeline_base.py @@ -7,6 +7,7 @@ from crewai.pipeline.pipeline import Pipeline from crewai.routers.router import Router +# TODO: Could potentially remove. Need to check with @joao and @gui if this is needed for CrewAI+ def PipelineBase(cls): class WrappedClass(cls): model_config = ConfigDict(arbitrary_types_allowed=True) @@ -49,9 +50,7 @@ def PipelineBase(cls): elif isinstance(stage, list) and all( isinstance(item, Crew) for item in stage ): - self.stages.append( - [crew_functions[item.__name__]() for item in stage] - ) + self.stages.append(stage) def build_pipeline(self) -> Pipeline: return Pipeline(stages=self.stages) diff --git a/src/crewai/routers/router.py b/src/crewai/routers/router.py index e11c816f2..c3715fdd0 100644 --- a/src/crewai/routers/router.py +++ b/src/crewai/routers/router.py @@ -1,17 +1,20 @@ -from dataclasses import dataclass +from copy import deepcopy from typing import Any, Callable, Dict, Generic, Tuple, TypeVar -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, PrivateAttr T = TypeVar("T", bound=Dict[str, Any]) U = TypeVar("U") -@dataclass class Route(Generic[T, U]): condition: Callable[[T], bool] pipeline: U + def __init__(self, condition: Callable[[T], bool], pipeline: U): + self.condition = condition + self.pipeline = pipeline + class Router(BaseModel, Generic[T, U]): routes: Dict[str, Route[T, U]] = Field( @@ -19,9 +22,21 @@ class Router(BaseModel, Generic[T, U]): description="Dictionary of route names to (condition, pipeline) tuples", ) default: U = Field(..., description="Default pipeline if no conditions are met") + _route_types: Dict[str, type] = PrivateAttr(default_factory=dict) + + model_config = {"arbitrary_types_allowed": True} def __init__(self, routes: Dict[str, Route[T, U]], default: U, **data): super().__init__(routes=routes, default=default, **data) + self._check_copyable(default) + for name, route in routes.items(): + self._check_copyable(route.pipeline) + self._route_types[name] = type(route.pipeline) + + @staticmethod + def _check_copyable(obj): + if not hasattr(obj, "copy") or not callable(getattr(obj, "copy")): + raise ValueError(f"Object of type {type(obj)} must have a 'copy' method") def add_route( self, @@ -40,7 +55,9 @@ class Router(BaseModel, Generic[T, U]): Returns: The Router instance for method chaining """ + self._check_copyable(pipeline) self.routes[name] = Route(condition=condition, pipeline=pipeline) + self._route_types[name] = type(pipeline) return self def route(self, input_data: T) -> Tuple[U, str]: @@ -58,3 +75,16 @@ class Router(BaseModel, Generic[T, U]): return route.pipeline, name return self.default, "default" + + def copy(self) -> "Router[T, U]": + """Create a deep copy of the Router.""" + new_routes = { + name: Route( + condition=deepcopy(route.condition), + pipeline=route.pipeline.copy(), + ) + for name, route in self.routes.items() + } + new_default = self.default.copy() + + return Router(routes=new_routes, default=new_default)