Clean up pipeline (#1187)

* Clean up pipeline

* Make versioning dynamic in templates

* fix .env issues when openai is trying to use invalid keys

* Fix type checker issue in pipeline

* Fix tests.
This commit is contained in:
Brandon Hancock (bhancock_ai)
2024-08-16 14:47:28 -04:00
committed by GitHub
parent dbf2570353
commit 3451b6fc7a
10 changed files with 210 additions and 108 deletions

View File

@@ -0,0 +1,136 @@
# Creating a CrewAI Pipeline Project
Welcome to the comprehensive guide for creating a new CrewAI pipeline project. This document will walk you through the steps to create, customize, and run your CrewAI pipeline project, ensuring you have everything you need to get started.
To learn more about CrewAI pipelines, visit the [CrewAI documentation](https://docs.crewai.com/core-concepts/Pipeline/).
## Prerequisites
Before getting started with CrewAI pipelines, make sure that you have installed CrewAI via pip:
```shell
$ pip install crewai crewai-tools
```
The same prerequisites for virtual environments and Code IDEs apply as in regular CrewAI projects.
## Creating a New Pipeline Project
To create a new CrewAI pipeline project, you have two options:
1. For a basic pipeline template:
```shell
$ crewai create pipeline <project_name>
```
2. For a pipeline example that includes a router:
```shell
$ crewai create pipeline --router <project_name>
```
These commands will create a new project folder with the following structure:
```
<project_name>/
├── README.md
├── poetry.lock
├── pyproject.toml
├── src/
│ └── <project_name>/
│ ├── __init__.py
│ ├── main.py
│ ├── crews/
│ │ ├── crew1/
│ │ │ ├── crew1.py
│ │ │ └── config/
│ │ │ ├── agents.yaml
│ │ │ └── tasks.yaml
│ │ ├── crew2/
│ │ │ ├── crew2.py
│ │ │ └── config/
│ │ │ ├── agents.yaml
│ │ │ └── tasks.yaml
│ ├── pipelines/
│ │ ├── __init__.py
│ │ ├── pipeline1.py
│ │ └── pipeline2.py
│ └── tools/
│ ├── __init__.py
│ └── custom_tool.py
└── tests/
```
## Customizing Your Pipeline Project
To customize your pipeline project, you can:
1. Modify the crew files in `src/<project_name>/crews/` to define your agents and tasks for each crew.
2. Modify the pipeline files in `src/<project_name>/pipelines/` to define your pipeline structure.
3. Modify `src/<project_name>/main.py` to set up and run your pipelines.
4. Add your environment variables into the `.env` file.
### Example: Defining a Pipeline
Here's an example of how to define a pipeline in `src/<project_name>/pipelines/normal_pipeline.py`:
```python
from crewai import Pipeline
from crewai.project import PipelineBase
from ..crews.normal_crew import NormalCrew
@PipelineBase
class NormalPipeline:
def __init__(self):
# Initialize crews
self.normal_crew = NormalCrew().crew()
def create_pipeline(self):
return Pipeline(
stages=[
self.normal_crew
]
)
async def kickoff(self, inputs):
pipeline = self.create_pipeline()
results = await pipeline.kickoff(inputs)
return results
```
### Annotations
The main annotation you'll use for pipelines is `@PipelineBase`. This annotation is used to decorate your pipeline classes, similar to how `@CrewBase` is used for crews.
## Installing Dependencies
To install the dependencies for your project, use Poetry:
```shell
$ cd <project_name>
$ poetry lock
$ poetry install
```
## Running Your Pipeline Project
To run your pipeline project, use the following command:
```shell
$ crewai run
```
or
```shell
$ poetry run <project_name>
```
This will initialize your pipeline and begin task execution as defined in your `main.py` file.
## Deploying Your Pipeline Project
Pipelines can be deployed in the same way as regular CrewAI projects. The easiest way is through [CrewAI+](https://www.crewai.com/crewaiplus), where you can deploy your pipeline in a few clicks.
Remember, when working with pipelines, you're orchestrating multiple crews to work together in a sequence or parallel fashion. This allows for more complex workflows and information processing tasks.

View File

@@ -8,14 +8,21 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
<div style="width:25%"> <div style="width:25%">
<h2>Getting Started</h2> <h2>Getting Started</h2>
<ul> <ul>
<li><a href='./getting-started/Installing-CrewAI'> <li>
<a href='./getting-started/Installing-CrewAI'>
Installing CrewAI Installing CrewAI
</a> </a>
</li> </li>
<li><a href='./getting-started/Start-a-New-CrewAI-Project-Template-Method'> <li>
<a href='./getting-started/Start-a-New-CrewAI-Project-Template-Method'>
Start a New CrewAI Project: Template Method Start a New CrewAI Project: Template Method
</a> </a>
</li> </li>
<li>
<a href='./getting-started/Create-a-New-CrewAI-Pipeline-Template-Method'>
Create a New CrewAI Pipeline: Template Method
</a>
</li>
</ul> </ul>
</div> </div>
<div style="width:25%"> <div style="width:25%">

View File

@@ -1,33 +1,29 @@
import threading import threading
import time import time
from typing import Any, Dict, Iterator, List, Literal, Optional, Tuple, Union from typing import Any, Dict, Iterator, List, Literal, Optional, Tuple, Union
import click import click
from langchain.agents import AgentExecutor from langchain.agents import AgentExecutor
from langchain.agents.agent import ExceptionTool from langchain.agents.agent import ExceptionTool
from langchain.callbacks.manager import CallbackManagerForChainRun from langchain.callbacks.manager import CallbackManagerForChainRun
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.agents import AgentAction, AgentFinish, AgentStep from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain_core.exceptions import OutputParserException from langchain_core.exceptions import OutputParserException
from langchain_core.tools import BaseTool from langchain_core.tools import BaseTool
from langchain_core.utils.input import get_color_mapping from langchain_core.utils.input import get_color_mapping
from pydantic import InstanceOf from pydantic import InstanceOf
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains.summarize import load_summarize_chain
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.tools_handler import ToolsHandler from crewai.agents.tools_handler import ToolsHandler
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N from crewai.utilities import I18N
from crewai.utilities.constants import TRAINING_DATA_FILE from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.exceptions.context_window_exceeding_exception import ( from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException, LLMContextLengthExceededException,
) )
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.utilities.logger import Logger from crewai.utilities.logger import Logger
from crewai.utilities.training_handler import CrewTrainingHandler
class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin): class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
@@ -213,11 +209,7 @@ class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
yield step yield step
return return
yield AgentStep( raise e
action=AgentAction("_Exception", str(e), str(e)),
observation=str(e),
)
return
# If the tool chosen is the finishing tool, then we end and return. # If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish): if isinstance(output, AgentFinish):

View File

@@ -6,7 +6,8 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">=3.10,<=3.13" python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = "^0.51.0" } crewai = { extras = ["tools"], version = ">=0.51.0,<1.0.0" }
[tool.poetry.scripts] [tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:run" {{folder_name}} = "{{folder_name}}.main:run"

View File

@@ -15,12 +15,15 @@ pip install poetry
Next, navigate to your project directory and install the dependencies: Next, navigate to your project directory and install the dependencies:
1. First lock the dependencies and then install them: 1. First lock the dependencies and then install them:
```bash ```bash
poetry lock poetry lock
``` ```
```bash ```bash
poetry install poetry install
``` ```
### Customizing ### Customizing
**Add your `OPENAI_API_KEY` into the `.env` file** **Add your `OPENAI_API_KEY` into the `.env` file**
@@ -49,6 +52,7 @@ The {{name}} Crew is composed of multiple AI agents, each with unique roles, goa
## Support ## Support
For support, questions, or feedback regarding the {{crew_name}} Crew or crewAI. For support, questions, or feedback regarding the {{crew_name}} Crew or crewAI.
- Visit our [documentation](https://docs.crewai.com) - Visit our [documentation](https://docs.crewai.com)
- Reach out to us through our [GitHub repository](https://github.com/joaomdmoura/crewai) - Reach out to us through our [GitHub repository](https://github.com/joaomdmoura/crewai)
- [Join our Discord](https://discord.com/invite/X4JWnZnxPb) - [Join our Discord](https://discord.com/invite/X4JWnZnxPb)

View File

@@ -6,7 +6,7 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">=3.10,<=3.13" python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = "^0.51.0" } crewai = { extras = ["tools"], version = ">=0.51.0,<1.0.0" }
asyncio = "*" asyncio = "*"
[tool.poetry.scripts] [tool.poetry.scripts]

View File

@@ -6,7 +6,8 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">=3.10,<=3.13" python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = "^0.51.0" } crewai = { extras = ["tools"], version = ">=0.51.0,<1.0.0" }
[tool.poetry.scripts] [tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:main" {{folder_name}} = "{{folder_name}}.main:main"

View File

@@ -1,5 +1,4 @@
import inspect import inspect
import os
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Dict from typing import Any, Callable, Dict
@@ -15,42 +14,34 @@ def CrewBase(cls):
model_config = ConfigDict(arbitrary_types_allowed=True) model_config = ConfigDict(arbitrary_types_allowed=True)
is_crew_class: bool = True # type: ignore is_crew_class: bool = True # type: ignore
base_directory = None # Get the directory of the class being decorated
for frame_info in inspect.stack(): base_directory = Path(inspect.getfile(cls)).parent
if "site-packages" not in frame_info.filename:
base_directory = Path(frame_info.filename).parent.resolve()
break
original_agents_config_path = getattr( original_agents_config_path = getattr(
cls, "agents_config", "config/agents.yaml" cls, "agents_config", "config/agents.yaml"
) )
original_tasks_config_path = getattr(cls, "tasks_config", "config/tasks.yaml") original_tasks_config_path = getattr(cls, "tasks_config", "config/tasks.yaml")
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
if self.base_directory is None: agents_config_path = self.base_directory / self.original_agents_config_path
raise Exception( tasks_config_path = self.base_directory / self.original_tasks_config_path
"Unable to dynamically determine the project's base directory, you must run it from the project's root directory."
)
self.agents_config = self.load_yaml( self.agents_config = self.load_yaml(agents_config_path)
os.path.join(self.base_directory, self.original_agents_config_path) self.tasks_config = self.load_yaml(tasks_config_path)
)
self.tasks_config = self.load_yaml(
os.path.join(self.base_directory, self.original_tasks_config_path)
)
self.map_all_agent_variables() self.map_all_agent_variables()
self.map_all_task_variables() self.map_all_task_variables()
@staticmethod @staticmethod
def load_yaml(config_path: str): def load_yaml(config_path: Path):
try:
with open(config_path, "r") as file: with open(config_path, "r") as file:
# parsedContent = YamlParser.parse(file) # type: ignore # Argument 1 to "parse" has incompatible type "TextIOWrapper"; expected "YamlParser"
return yaml.safe_load(file) return yaml.safe_load(file)
except FileNotFoundError:
print(f"File not found: {config_path}")
raise
def _get_all_functions(self): def _get_all_functions(self):
return { return {

View File

@@ -11,7 +11,7 @@ from crewai.routers.router import Router
def PipelineBase(cls): def PipelineBase(cls):
class WrappedClass(cls): class WrappedClass(cls):
model_config = ConfigDict(arbitrary_types_allowed=True) model_config = ConfigDict(arbitrary_types_allowed=True)
is_pipeline_class: bool = True is_pipeline_class: bool = True # type: ignore
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)

View File

@@ -25,14 +25,20 @@ def mock_crew_factory():
MockCrewClass = type("MockCrew", (MagicMock, Crew), {}) MockCrewClass = type("MockCrew", (MagicMock, Crew), {})
class MockCrew(MockCrewClass): class MockCrew(MockCrewClass):
def __deepcopy__(self, memo): def __deepcopy__(self):
result = MockCrewClass() result = MockCrewClass()
result.kickoff_async = self.kickoff_async result.kickoff_async = self.kickoff_async
result.name = self.name result.name = self.name
return result return result
def copy(
self,
):
return self
crew = MockCrew() crew = MockCrew()
crew.name = name crew.name = name
task_output = TaskOutput( task_output = TaskOutput(
description="Test task", raw="Task output", agent="Test Agent" description="Test task", raw="Task output", agent="Test Agent"
) )
@@ -44,9 +50,15 @@ def mock_crew_factory():
pydantic=pydantic_output, pydantic=pydantic_output,
) )
async def async_kickoff(inputs=None): async def kickoff_async(inputs=None):
return crew_output return crew_output
# Create an AsyncMock for kickoff_async
crew.kickoff_async = AsyncMock(side_effect=kickoff_async)
# Mock the synchronous kickoff method
crew.kickoff = MagicMock(return_value=crew_output)
# Add more attributes that Procedure might be expecting # Add more attributes that Procedure might be expecting
crew.verbose = False crew.verbose = False
crew.output_log_file = None crew.output_log_file = None
@@ -56,30 +68,16 @@ def mock_crew_factory():
crew.config = None crew.config = None
crew.cache = True crew.cache = True
# # Create a valid Agent instance # Add non-empty agents and tasks
mock_agent = Agent( mock_agent = MagicMock(spec=Agent)
name="Mock Agent", mock_task = MagicMock(spec=Task)
role="Mock Role", mock_task.agent = mock_agent
goal="Mock Goal", mock_task.async_execution = False
backstory="Mock Backstory", mock_task.context = None
allow_delegation=False,
verbose=False,
)
# Create a valid Task instance
mock_task = Task(
description="Return: Test output",
expected_output="Test output",
agent=mock_agent,
async_execution=False,
context=None,
)
crew.agents = [mock_agent] crew.agents = [mock_agent]
crew.tasks = [mock_task] crew.tasks = [mock_task]
crew.kickoff_async = AsyncMock(side_effect=async_kickoff)
return crew return crew
return _create_mock_crew return _create_mock_crew
@@ -115,9 +113,7 @@ def mock_router_factory(mock_crew_factory):
( (
"route1" "route1"
if x.get("score", 0) > 80 if x.get("score", 0) > 80
else "route2" else "route2" if x.get("score", 0) > 50 else "default"
if x.get("score", 0) > 50
else "default"
), ),
) )
) )
@@ -477,31 +473,17 @@ async def test_pipeline_with_parallel_stages_end_in_single_stage(mock_crew_facto
""" """
Test that Pipeline correctly handles parallel stages. Test that Pipeline correctly handles parallel stages.
""" """
crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent]) crew1 = mock_crew_factory(name="Crew 1")
crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent]) crew2 = mock_crew_factory(name="Crew 2")
crew3 = Crew(name="Crew 3", tasks=[task], agents=[agent]) crew3 = mock_crew_factory(name="Crew 3")
crew4 = Crew(name="Crew 4", tasks=[task], agents=[agent]) crew4 = mock_crew_factory(name="Crew 4")
pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4]) pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4])
input_data = [{"initial": "data"}] input_data = [{"initial": "data"}]
pipeline_result = await pipeline.kickoff(input_data) pipeline_result = await pipeline.kickoff(input_data)
with patch.object(Crew, "kickoff_async") as mock_kickoff: crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
mock_kickoff.return_value = CrewOutput(
raw="Test output",
tasks_output=[
TaskOutput(
description="Test task", raw="Task output", agent="Test Agent"
)
],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict=None,
pydantic=None,
)
pipeline_result = await pipeline.kickoff(input_data)
mock_kickoff.assert_called_with(inputs={"initial": "data"})
assert len(pipeline_result) == 1 assert len(pipeline_result) == 1
pipeline_result_1 = pipeline_result[0] pipeline_result_1 = pipeline_result[0]
@@ -649,33 +631,21 @@ Options:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_pipeline_data_accumulation(): async def test_pipeline_data_accumulation(mock_crew_factory):
crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent]) crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"})
crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent]) crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"key2": "value2"})
pipeline = Pipeline(stages=[crew1, crew2]) pipeline = Pipeline(stages=[crew1, crew2])
input_data = [{"initial": "data"}] input_data = [{"initial": "data"}]
results = await pipeline.kickoff(input_data) results = await pipeline.kickoff(input_data)
with patch.object(Crew, "kickoff_async") as mock_kickoff: # Check that crew1 was called with only the initial input
mock_kickoff.side_effect = [ crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
CrewOutput(
raw="Test output from Crew 1",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"key1": "value1"},
pydantic=None,
),
CrewOutput(
raw="Test output from Crew 2",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"key2": "value2"},
pydantic=None,
),
]
results = await pipeline.kickoff(input_data) # Check that crew2 was called with the combined input from the initial data and crew1's output
crew2.kickoff_async.assert_called_once_with(
inputs={"initial": "data", "key1": "value1"}
)
# Check the final output # Check the final output
assert len(results) == 1 assert len(results) == 1