Clean up pipeline (#1187)

* Clean up pipeline

* Make versioning dynamic in templates

* fix .env issues when openai is trying to use invalid keys

* Fix type checker issue in pipeline

* Fix tests.
This commit is contained in:
Brandon Hancock (bhancock_ai)
2024-08-16 14:47:28 -04:00
committed by GitHub
parent 8fdf741b73
commit c511f4d0b5
10 changed files with 210 additions and 108 deletions

View File

@@ -0,0 +1,136 @@
# Creating a CrewAI Pipeline Project
Welcome to the comprehensive guide for creating a new CrewAI pipeline project. This document will walk you through the steps to create, customize, and run your CrewAI pipeline project, ensuring you have everything you need to get started.
To learn more about CrewAI pipelines, visit the [CrewAI documentation](https://docs.crewai.com/core-concepts/Pipeline/).
## Prerequisites
Before getting started with CrewAI pipelines, make sure that you have installed CrewAI via pip:
```shell
$ pip install crewai crewai-tools
```
The same prerequisites for virtual environments and Code IDEs apply as in regular CrewAI projects.
## Creating a New Pipeline Project
To create a new CrewAI pipeline project, you have two options:
1. For a basic pipeline template:
```shell
$ crewai create pipeline <project_name>
```
2. For a pipeline example that includes a router:
```shell
$ crewai create pipeline --router <project_name>
```
These commands will create a new project folder with the following structure:
```
<project_name>/
├── README.md
├── poetry.lock
├── pyproject.toml
├── src/
│ └── <project_name>/
│ ├── __init__.py
│ ├── main.py
│ ├── crews/
│ │ ├── crew1/
│ │ │ ├── crew1.py
│ │ │ └── config/
│ │ │ ├── agents.yaml
│ │ │ └── tasks.yaml
│ │ ├── crew2/
│ │ │ ├── crew2.py
│ │ │ └── config/
│ │ │ ├── agents.yaml
│ │ │ └── tasks.yaml
│ ├── pipelines/
│ │ ├── __init__.py
│ │ ├── pipeline1.py
│ │ └── pipeline2.py
│ └── tools/
│ ├── __init__.py
│ └── custom_tool.py
└── tests/
```
## Customizing Your Pipeline Project
To customize your pipeline project, you can:
1. Modify the crew files in `src/<project_name>/crews/` to define your agents and tasks for each crew.
2. Modify the pipeline files in `src/<project_name>/pipelines/` to define your pipeline structure.
3. Modify `src/<project_name>/main.py` to set up and run your pipelines.
4. Add your environment variables into the `.env` file.
### Example: Defining a Pipeline
Here's an example of how to define a pipeline in `src/<project_name>/pipelines/normal_pipeline.py`:
```python
from crewai import Pipeline
from crewai.project import PipelineBase
from ..crews.normal_crew import NormalCrew
@PipelineBase
class NormalPipeline:
def __init__(self):
# Initialize crews
self.normal_crew = NormalCrew().crew()
def create_pipeline(self):
return Pipeline(
stages=[
self.normal_crew
]
)
async def kickoff(self, inputs):
pipeline = self.create_pipeline()
results = await pipeline.kickoff(inputs)
return results
```
### Annotations
The main annotation you'll use for pipelines is `@PipelineBase`. This annotation is used to decorate your pipeline classes, similar to how `@CrewBase` is used for crews.
## Installing Dependencies
To install the dependencies for your project, use Poetry:
```shell
$ cd <project_name>
$ poetry lock
$ poetry install
```
## Running Your Pipeline Project
To run your pipeline project, use the following command:
```shell
$ crewai run
```
or
```shell
$ poetry run <project_name>
```
This will initialize your pipeline and begin task execution as defined in your `main.py` file.
## Deploying Your Pipeline Project
Pipelines can be deployed in the same way as regular CrewAI projects. The easiest way is through [CrewAI+](https://www.crewai.com/crewaiplus), where you can deploy your pipeline in a few clicks.
Remember, when working with pipelines, you're orchestrating multiple crews to work together in a sequence or parallel fashion. This allows for more complex workflows and information processing tasks.

View File

@@ -8,13 +8,20 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
<div style="width:25%">
<h2>Getting Started</h2>
<ul>
<li><a href='./getting-started/Installing-CrewAI'>
<li>
<a href='./getting-started/Installing-CrewAI'>
Installing CrewAI
</a>
</a>
</li>
<li><a href='./getting-started/Start-a-New-CrewAI-Project-Template-Method'>
<li>
<a href='./getting-started/Start-a-New-CrewAI-Project-Template-Method'>
Start a New CrewAI Project: Template Method
</a>
</a>
</li>
<li>
<a href='./getting-started/Create-a-New-CrewAI-Pipeline-Template-Method'>
Create a New CrewAI Pipeline: Template Method
</a>
</li>
</ul>
</div>

View File

@@ -1,33 +1,29 @@
import threading
import time
from typing import Any, Dict, Iterator, List, Literal, Optional, Tuple, Union
import click
from langchain.agents import AgentExecutor
from langchain.agents.agent import ExceptionTool
from langchain.callbacks.manager import CallbackManagerForChainRun
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain_core.exceptions import OutputParserException
from langchain_core.tools import BaseTool
from langchain_core.utils.input import get_color_mapping
from pydantic import InstanceOf
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains.summarize import load_summarize_chain
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.tools_handler import ToolsHandler
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.utilities.logger import Logger
from crewai.utilities.training_handler import CrewTrainingHandler
class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
@@ -213,11 +209,7 @@ class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
yield step
return
yield AgentStep(
action=AgentAction("_Exception", str(e), str(e)),
observation=str(e),
)
return
raise e
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):

View File

@@ -6,7 +6,8 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = "^0.51.0" }
crewai = { extras = ["tools"], version = ">=0.51.0,<1.0.0" }
[tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:run"

View File

@@ -15,12 +15,15 @@ pip install poetry
Next, navigate to your project directory and install the dependencies:
1. First lock the dependencies and then install them:
```bash
poetry lock
```
```bash
poetry install
```
### Customizing
**Add your `OPENAI_API_KEY` into the `.env` file**
@@ -49,6 +52,7 @@ The {{name}} Crew is composed of multiple AI agents, each with unique roles, goa
## Support
For support, questions, or feedback regarding the {{crew_name}} Crew or crewAI.
- Visit our [documentation](https://docs.crewai.com)
- Reach out to us through our [GitHub repository](https://github.com/joaomdmoura/crewai)
- [Join our Discord](https://discord.com/invite/X4JWnZnxPb)

View File

@@ -6,7 +6,7 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = "^0.51.0" }
crewai = { extras = ["tools"], version = ">=0.51.0,<1.0.0" }
asyncio = "*"
[tool.poetry.scripts]

View File

@@ -6,7 +6,8 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = "^0.51.0" }
crewai = { extras = ["tools"], version = ">=0.51.0,<1.0.0" }
[tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:main"

View File

@@ -1,5 +1,4 @@
import inspect
import os
from pathlib import Path
from typing import Any, Callable, Dict
@@ -15,42 +14,34 @@ def CrewBase(cls):
model_config = ConfigDict(arbitrary_types_allowed=True)
is_crew_class: bool = True # type: ignore
base_directory = None
for frame_info in inspect.stack():
if "site-packages" not in frame_info.filename:
base_directory = Path(frame_info.filename).parent.resolve()
break
# Get the directory of the class being decorated
base_directory = Path(inspect.getfile(cls)).parent
original_agents_config_path = getattr(
cls, "agents_config", "config/agents.yaml"
)
original_tasks_config_path = getattr(cls, "tasks_config", "config/tasks.yaml")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.base_directory is None:
raise Exception(
"Unable to dynamically determine the project's base directory, you must run it from the project's root directory."
)
agents_config_path = self.base_directory / self.original_agents_config_path
tasks_config_path = self.base_directory / self.original_tasks_config_path
self.agents_config = self.load_yaml(
os.path.join(self.base_directory, self.original_agents_config_path)
)
self.tasks_config = self.load_yaml(
os.path.join(self.base_directory, self.original_tasks_config_path)
)
self.agents_config = self.load_yaml(agents_config_path)
self.tasks_config = self.load_yaml(tasks_config_path)
self.map_all_agent_variables()
self.map_all_task_variables()
@staticmethod
def load_yaml(config_path: str):
with open(config_path, "r") as file:
# parsedContent = YamlParser.parse(file) # type: ignore # Argument 1 to "parse" has incompatible type "TextIOWrapper"; expected "YamlParser"
return yaml.safe_load(file)
def load_yaml(config_path: Path):
try:
with open(config_path, "r") as file:
return yaml.safe_load(file)
except FileNotFoundError:
print(f"File not found: {config_path}")
raise
def _get_all_functions(self):
return {

View File

@@ -11,7 +11,7 @@ from crewai.routers.router import Router
def PipelineBase(cls):
class WrappedClass(cls):
model_config = ConfigDict(arbitrary_types_allowed=True)
is_pipeline_class: bool = True
is_pipeline_class: bool = True # type: ignore
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

View File

@@ -25,14 +25,20 @@ def mock_crew_factory():
MockCrewClass = type("MockCrew", (MagicMock, Crew), {})
class MockCrew(MockCrewClass):
def __deepcopy__(self, memo):
def __deepcopy__(self):
result = MockCrewClass()
result.kickoff_async = self.kickoff_async
result.name = self.name
return result
def copy(
self,
):
return self
crew = MockCrew()
crew.name = name
task_output = TaskOutput(
description="Test task", raw="Task output", agent="Test Agent"
)
@@ -44,9 +50,15 @@ def mock_crew_factory():
pydantic=pydantic_output,
)
async def async_kickoff(inputs=None):
async def kickoff_async(inputs=None):
return crew_output
# Create an AsyncMock for kickoff_async
crew.kickoff_async = AsyncMock(side_effect=kickoff_async)
# Mock the synchronous kickoff method
crew.kickoff = MagicMock(return_value=crew_output)
# Add more attributes that Procedure might be expecting
crew.verbose = False
crew.output_log_file = None
@@ -56,30 +68,16 @@ def mock_crew_factory():
crew.config = None
crew.cache = True
# # Create a valid Agent instance
mock_agent = Agent(
name="Mock Agent",
role="Mock Role",
goal="Mock Goal",
backstory="Mock Backstory",
allow_delegation=False,
verbose=False,
)
# Create a valid Task instance
mock_task = Task(
description="Return: Test output",
expected_output="Test output",
agent=mock_agent,
async_execution=False,
context=None,
)
# Add non-empty agents and tasks
mock_agent = MagicMock(spec=Agent)
mock_task = MagicMock(spec=Task)
mock_task.agent = mock_agent
mock_task.async_execution = False
mock_task.context = None
crew.agents = [mock_agent]
crew.tasks = [mock_task]
crew.kickoff_async = AsyncMock(side_effect=async_kickoff)
return crew
return _create_mock_crew
@@ -115,9 +113,7 @@ def mock_router_factory(mock_crew_factory):
(
"route1"
if x.get("score", 0) > 80
else "route2"
if x.get("score", 0) > 50
else "default"
else "route2" if x.get("score", 0) > 50 else "default"
),
)
)
@@ -477,31 +473,17 @@ async def test_pipeline_with_parallel_stages_end_in_single_stage(mock_crew_facto
"""
Test that Pipeline correctly handles parallel stages.
"""
crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent])
crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent])
crew3 = Crew(name="Crew 3", tasks=[task], agents=[agent])
crew4 = Crew(name="Crew 4", tasks=[task], agents=[agent])
crew1 = mock_crew_factory(name="Crew 1")
crew2 = mock_crew_factory(name="Crew 2")
crew3 = mock_crew_factory(name="Crew 3")
crew4 = mock_crew_factory(name="Crew 4")
pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4])
input_data = [{"initial": "data"}]
pipeline_result = await pipeline.kickoff(input_data)
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_kickoff.return_value = CrewOutput(
raw="Test output",
tasks_output=[
TaskOutput(
description="Test task", raw="Task output", agent="Test Agent"
)
],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict=None,
pydantic=None,
)
pipeline_result = await pipeline.kickoff(input_data)
mock_kickoff.assert_called_with(inputs={"initial": "data"})
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
assert len(pipeline_result) == 1
pipeline_result_1 = pipeline_result[0]
@@ -649,33 +631,21 @@ Options:
@pytest.mark.asyncio
async def test_pipeline_data_accumulation():
crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent])
crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent])
async def test_pipeline_data_accumulation(mock_crew_factory):
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"})
crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"key2": "value2"})
pipeline = Pipeline(stages=[crew1, crew2])
input_data = [{"initial": "data"}]
results = await pipeline.kickoff(input_data)
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_kickoff.side_effect = [
CrewOutput(
raw="Test output from Crew 1",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"key1": "value1"},
pydantic=None,
),
CrewOutput(
raw="Test output from Crew 2",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"key2": "value2"},
pydantic=None,
),
]
# Check that crew1 was called with only the initial input
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
results = await pipeline.kickoff(input_data)
# Check that crew2 was called with the combined input from the initial data and crew1's output
crew2.kickoff_async.assert_called_once_with(
inputs={"initial": "data", "key1": "value1"}
)
# Check the final output
assert len(results) == 1