Working on creating Crew templates and pipeline templates

This commit is contained in:
Brandon Hancock
2024-08-01 11:43:39 -04:00
parent c5e8302a7d
commit 9996cf206e
29 changed files with 502 additions and 66 deletions

View File

@@ -224,8 +224,8 @@ urgent_crew = Crew(agents=[urgent_handler], tasks=[urgent_task])
normal_crew = Crew(agents=[normal_handler], tasks=[normal_task])
# Create pipelines for different urgency levels
urgent_pipeline = Pipeline(stages=[classification_crew, urgent_crew])
normal_pipeline = Pipeline(stages=[classification_crew, normal_crew])
urgent_pipeline = Pipeline(stages=[urgent_crew])
normal_pipeline = Pipeline(stages=[normal_crew])
# Create a router
email_router = Router(
@@ -243,7 +243,7 @@ email_router = Router(
)
# Use the router in a main pipeline
main_pipeline = Pipeline(stages=[email_router])
main_pipeline = Pipeline(stages=[classification_crew, email_router])
inputs = [{"email": "..."}, {"email": "..."}] # List of email data

View File

@@ -1,11 +1,12 @@
import click
import pkg_resources
from crewai.cli.create_crew import create_crew
from crewai.cli.create_pipeline import create_pipeline
from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
)
from .create_crew import create_crew
from .evaluate_crew import evaluate_crew
from .replay_from_task import replay_task_command
from .reset_memories_command import reset_memories_command
@@ -18,10 +19,23 @@ def crewai():
@crewai.command()
@click.argument("project_name")
def create(project_name):
"""Create a new crew."""
create_crew(project_name)
@click.argument("type", type=click.Choice(["crew", "pipeline"]))
@click.argument("name")
@click.argument("crew_names", nargs=-1)
def create(type, name, crew_names):
"""Create a new crew or pipeline."""
if type == "crew":
create_crew(name)
elif type == "pipeline":
if not crew_names:
click.secho(
"Error: At least one crew name must be provided for a pipeline.",
fg="red",
)
return
create_pipeline(name, crew_names)
else:
click.secho("Error: Invalid type. Must be 'crew' or 'pipeline'.", fg="red")
@crewai.command()

View File

@@ -1,25 +1,35 @@
import os
from pathlib import Path
import click
from crewai.cli.utils import copy_template
def create_crew(name):
def create_crew(name, parent_folder=None):
"""Create a new crew."""
folder_name = name.replace(" ", "_").replace("-", "_").lower()
class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "")
click.secho(f"Creating folder {folder_name}...", fg="green", bold=True)
if parent_folder:
folder_path = Path(parent_folder) / folder_name
else:
folder_path = Path(folder_name)
if not os.path.exists(folder_name):
os.mkdir(folder_name)
os.mkdir(folder_name + "/tests")
os.mkdir(folder_name + "/src")
os.mkdir(folder_name + f"/src/{folder_name}")
os.mkdir(folder_name + f"/src/{folder_name}/tools")
os.mkdir(folder_name + f"/src/{folder_name}/config")
with open(folder_name + "/.env", "w") as file:
file.write("OPENAI_API_KEY=YOUR_API_KEY")
click.secho(
f"Creating {'crew' if parent_folder else 'folder'} {folder_name}...",
fg="green",
bold=True,
)
if not folder_path.exists():
folder_path.mkdir(parents=True)
(folder_path / "tests").mkdir(exist_ok=True)
if not parent_folder:
(folder_path / "src" / folder_name).mkdir(parents=True)
(folder_path / "src" / folder_name / "tools").mkdir(parents=True)
(folder_path / "src" / folder_name / "config").mkdir(parents=True)
with open(folder_path / ".env", "w") as file:
file.write("OPENAI_API_KEY=YOUR_API_KEY")
else:
click.secho(
f"\tFolder {folder_name} already exists. Please choose a different name.",
@@ -28,53 +38,34 @@ def create_crew(name):
return
package_dir = Path(__file__).parent
templates_dir = package_dir / "templates"
templates_dir = package_dir / "templates" / "crew"
# List of template files to copy
root_template_files = [
".gitignore",
"pyproject.toml",
"README.md",
]
root_template_files = (
[".gitignore", "pyproject.toml", "README.md"] if not parent_folder else []
)
tools_template_files = ["tools/custom_tool.py", "tools/__init__.py"]
config_template_files = ["config/agents.yaml", "config/tasks.yaml"]
src_template_files = ["__init__.py", "main.py", "crew.py"]
src_template_files = (
["__init__.py", "main.py", "crew.py"] if not parent_folder else ["crew.py"]
)
for file_name in root_template_files:
src_file = templates_dir / file_name
dst_file = Path(folder_name) / file_name
dst_file = folder_path / file_name
copy_template(src_file, dst_file, name, class_name, folder_name)
src_folder = folder_path / "src" / folder_name if not parent_folder else folder_path
for file_name in src_template_files:
src_file = templates_dir / file_name
dst_file = Path(folder_name) / "src" / folder_name / file_name
dst_file = src_folder / file_name
copy_template(src_file, dst_file, name, class_name, folder_name)
for file_name in tools_template_files:
src_file = templates_dir / file_name
dst_file = Path(folder_name) / "src" / folder_name / file_name
copy_template(src_file, dst_file, name, class_name, folder_name)
for file_name in config_template_files:
src_file = templates_dir / file_name
dst_file = Path(folder_name) / "src" / folder_name / file_name
copy_template(src_file, dst_file, name, class_name, folder_name)
if not parent_folder:
for file_name in tools_template_files + config_template_files:
src_file = templates_dir / file_name
dst_file = src_folder / file_name
copy_template(src_file, dst_file, name, class_name, folder_name)
click.secho(f"Crew {name} created successfully!", fg="green", bold=True)
def copy_template(src, dst, name, class_name, folder_name):
"""Copy a file from src to dst."""
with open(src, "r") as file:
content = file.read()
# Interpolate the content
content = content.replace("{{name}}", name)
content = content.replace("{{crew_name}}", class_name)
content = content.replace("{{folder_name}}", folder_name)
# Write the interpolated content to the new file
with open(dst, "w") as file:
file.write(content)
click.secho(f" - Created {dst}", fg="green")

View File

@@ -0,0 +1,119 @@
import os
from pathlib import Path
import click
from crewai.cli.create_crew import create_crew
from crewai.cli.utils import copy_template
def create_pipeline(pipeline_name, crew_names):
"""Create a new pipeline with multiple crews."""
folder_name = pipeline_name.replace(" ", "_").replace("-", "_").lower()
class_name = (
pipeline_name.replace("_", " ").replace("-", " ").title().replace(" ", "")
)
click.secho(f"Creating pipeline {folder_name}...", fg="green", bold=True)
# Create the main project structure
project_root = Path(folder_name)
project_root.mkdir(exist_ok=True)
(project_root / "src" / folder_name).mkdir(parents=True, exist_ok=True)
(project_root / "src" / folder_name / "crews").mkdir(parents=True, exist_ok=True)
(project_root / "src" / folder_name / "config").mkdir(parents=True, exist_ok=True)
(project_root / "src" / folder_name / "tools").mkdir(parents=True, exist_ok=True)
(project_root / "tests").mkdir(exist_ok=True)
# Create .env file
with open(project_root / ".env", "w") as file:
file.write("OPENAI_API_KEY=YOUR_API_KEY")
package_dir = Path(__file__).parent
templates_dir = package_dir / "templates" / "pipeline"
# Process main.py template
with open(templates_dir / "main.py", "r") as f:
main_content = f.read()
# Replace variables in main.py
main_content = main_content.replace("{{folder_name}}", folder_name)
main_content = main_content.replace("{{pipeline_name}}", class_name)
# Write updated main.py
with open(project_root / "src" / folder_name / "main.py", "w") as f:
f.write(main_content)
# Process pipeline.py template
with open(templates_dir / "pipeline.py", "r") as f:
pipeline_content = f.read()
# Replace pipeline name
pipeline_content = pipeline_content.replace("{{pipeline_name}}", class_name)
# Generate crew initialization lines
crew_init_lines = []
crew_stage_lines = []
for crew_name in crew_names:
crew_class_name = (
crew_name.replace("_", " ").replace("-", " ").title().replace(" ", "")
)
crew_init_lines.append(
f" self.{crew_name.lower()}_crew = {crew_class_name}Crew().crew()"
)
crew_stage_lines.append(f" self.{crew_name.lower()}_crew,")
# Replace crew initialization placeholder
pipeline_content = pipeline_content.replace(
" {% for crew_name in crew_names %}\n"
" self.{{crew_name.lower()}}_crew = {{crew_name}}Crew().crew()\n"
" {% endfor %}",
"\n".join(crew_init_lines),
)
# Replace crew stages placeholder
pipeline_content = pipeline_content.replace(
" {% for crew_name in crew_names %}\n"
" self.{{crew_name.lower()}}_crew,\n"
" {% endfor %}",
"\n".join(crew_stage_lines),
)
# Update imports with correct package structure
crew_imports = [
f"from {folder_name}.src.{folder_name}.crews.{name.lower()}.crew import {name.replace('_', ' ').replace('-', ' ').title().replace(' ', '')}Crew"
for name in crew_names
]
pipeline_content = pipeline_content.replace(
"from crews.crew import *", "\n".join(crew_imports)
)
with open(project_root / "src" / folder_name / "pipeline.py", "w") as f:
f.write(pipeline_content)
# Copy and process other template files
template_files = [
(".gitignore", project_root),
("pyproject.toml", project_root),
("README.md", project_root),
("__init__.py", project_root / "src" / folder_name),
("tools/custom_tool.py", project_root / "src" / folder_name / "tools"),
("tools/__init__.py", project_root / "src" / folder_name / "tools"),
("config/agents.yaml", project_root / "src" / folder_name / "config"),
("config/tasks.yaml", project_root / "src" / folder_name / "config"),
]
for template_file, destination in template_files:
src_file = templates_dir / template_file
dst_file = destination / os.path.basename(template_file)
copy_template(src_file, dst_file, pipeline_name, class_name, folder_name)
# Create crew files
for crew_name in crew_names:
create_crew(crew_name, project_root / "src" / folder_name / "crews")
click.secho(
f"Pipeline {pipeline_name} created successfully with crews: {', '.join(crew_names)}!",
fg="green",
bold=True,
)

View File

@@ -0,0 +1,2 @@
.env
__pycache__/

View File

@@ -0,0 +1,57 @@
# {{crew_name}} Crew
Welcome to the {{crew_name}} Crew project, powered by [crewAI](https://crewai.com). This template is designed to help you set up a multi-agent AI system with ease, leveraging the powerful and flexible framework provided by crewAI. Our goal is to enable your agents to collaborate effectively on complex tasks, maximizing their collective intelligence and capabilities.
## Installation
Ensure you have Python >=3.10 <=3.13 installed on your system. This project uses [Poetry](https://python-poetry.org/) for dependency management and package handling, offering a seamless setup and execution experience.
First, if you haven't already, install Poetry:
```bash
pip install poetry
```
Next, navigate to your project directory and install the dependencies:
1. First lock the dependencies and then install them:
```bash
poetry lock
```
```bash
poetry install
```
### Customizing
**Add your `OPENAI_API_KEY` into the `.env` file**
- Modify `src/{{folder_name}}/config/agents.yaml` to define your agents
- Modify `src/{{folder_name}}/config/tasks.yaml` to define your tasks
- Modify `src/{{folder_name}}/crew.py` to add your own logic, tools and specific args
- Modify `src/{{folder_name}}/main.py` to add custom inputs for your agents and tasks
## Running the Project
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project:
```bash
poetry run {{folder_name}}
```
This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration.
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.
## Understanding Your Crew
The {{name}} Crew is composed of multiple AI agents, each with unique roles, goals, and tools. These agents collaborate on a series of tasks, defined in `config/tasks.yaml`, leveraging their collective skills to achieve complex objectives. The `config/agents.yaml` file outlines the capabilities and configurations of each agent in your crew.
## Support
For support, questions, or feedback regarding the {{crew_name}} Crew or crewAI.
- Visit our [documentation](https://docs.crewai.com)
- Reach out to us through our [GitHub repository](https://github.com/joaomdmoura/crewai)
- [Join our Discord](https://discord.com/invite/X4JWnZnxPb)
- [Chat with our docs](https://chatg.pt/DWjSBZn)
Let's create wonders together with the power and simplicity of crewAI.

View File

@@ -0,0 +1,19 @@
researcher:
role: >
{topic} Senior Data Researcher
goal: >
Uncover cutting-edge developments in {topic}
backstory: >
You're a seasoned researcher with a knack for uncovering the latest
developments in {topic}. Known for your ability to find the most relevant
information and present it in a clear and concise manner.
reporting_analyst:
role: >
{topic} Reporting Analyst
goal: >
Create detailed reports based on {topic} data analysis and research findings
backstory: >
You're a meticulous analyst with a keen eye for detail. You're known for
your ability to turn complex data into clear and concise reports, making
it easy for others to understand and act on the information you provide.

View File

@@ -0,0 +1,17 @@
research_task:
description: >
Conduct a thorough research about {topic}
Make sure you find any interesting and relevant information given
the current year is 2024.
expected_output: >
A list with 10 bullet points of the most relevant information about {topic}
agent: researcher
reporting_task:
description: >
Review the context you got and expand each topic into a full section for a report.
Make sure the report is detailed and contains any and all relevant information.
expected_output: >
A fully fledge reports with the mains topics, each with a full section of information.
Formatted as markdown without '```'
agent: reporting_analyst

View File

@@ -0,0 +1,53 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
# Uncomment the following line to use an example of a custom tool
# from {{folder_name}}.tools.custom_tool import MyCustomTool
# Check our tools documentations for more information on how to use them
# from crewai_tools import SerperDevTool
@CrewBase
class {{crew_name}}Crew():
"""{{crew_name}} crew"""
agents_config = '../config/agents.yaml'
tasks_config = '../config/tasks.yaml'
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
# tools=[MyCustomTool()], # Example of custom tool, loaded on the beginning of file
verbose=True
)
@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'],
verbose=True
)
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
)
@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
output_file='report.md'
)
@crew
def crew(self) -> Crew:
"""Creates the {{crew_name}} crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=2,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python
import asyncio
from {{folder_name}}.src.{{folder_name}}.pipeline import {{pipeline_name}}Pipeline
async def run():
"""
Run the pipeline.
"""
inputs = [
{"topic": "AI LLMs"},
]
await {{pipeline_name}}Pipeline().pipeline().kickoff(inputs=inputs)
if __name__ == "__main__":
asyncio.run(run())

View File

@@ -0,0 +1,26 @@
from crewai import Pipeline
from crewai.project import PipelineBase
from crews.crew import *
@PipelineBase
class {{pipeline_name}}Pipeline:
def __init__(self):
# Initialize crews
{% for crew_name in crew_names %}
self.{{crew_name.lower()}}_crew = {{crew_name}}Crew().crew()
{% endfor %}
@pipeline
def create_pipeline(self):
return Pipeline(
stages=[
{% for crew_name in crew_names %}
self.{{crew_name.lower()}}_crew,
{% endfor %}
]
)
async def run(self, inputs):
pipeline = self.create_pipeline()
results = await pipeline.kickoff(inputs)
return results

View File

@@ -0,0 +1,19 @@
[tool.poetry]
name = "{{folder_name}}"
version = "0.1.0"
description = "{{name}} using crewAI"
authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = "^0.46.0" }
[tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:run"
train = "{{folder_name}}.main:train"
replay = "{{folder_name}}.main:replay"
test = "{{folder_name}}.main:test"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -0,0 +1,12 @@
from crewai_tools import BaseTool
class MyCustomTool(BaseTool):
name: str = "Name of my tool"
description: str = (
"Clear description for what this tool is useful for, you agent will need this information to use it."
)
def _run(self, argument: str) -> str:
# Implementation goes here
return "this is an example of a tool output, ignore it and move along."

20
src/crewai/cli/utils.py Normal file
View File

@@ -0,0 +1,20 @@
import click
def copy_template(src, dst, name, class_name, folder_name):
print(f"Copying {src} to {dst}")
print(f"Interpolating {name}, {class_name}, {folder_name}")
"""Copy a file from src to dst."""
with open(src, "r") as file:
content = file.read()
# Interpolate the content
content = content.replace("{{name}}", name)
content = content.replace("{{crew_name}}", class_name)
content = content.replace("{{folder_name}}", folder_name)
# Write the interpolated content to the new file
with open(dst, "w") as file:
file.write(content)
click.secho(f" - Created {dst}", fg="green")

View File

@@ -1,14 +1,4 @@
def memoize(func):
cache = {}
def memoized_func(*args, **kwargs):
key = (args, tuple(kwargs.items()))
if key not in cache:
cache[key] = func(*args, **kwargs)
return cache[key]
memoized_func.__dict__.update(func.__dict__)
return memoized_func
from crewai.project.utils import memoize
def task(func):
@@ -61,6 +51,16 @@ def cache_handler(func):
return memoize(func)
def stage(func):
func.is_stage = True
return memoize(func)
def router(func):
func.is_router = True
return memoize(func)
def crew(func):
def wrapper(self, *args, **kwargs):
instantiated_tasks = []

View File

@@ -0,0 +1,59 @@
from typing import Callable, Dict
from pydantic import ConfigDict
from crewai.crew import Crew
from crewai.pipeline.pipeline import Pipeline
from crewai.routers.router import Router
def PipelineBase(cls):
class WrappedClass(cls):
model_config = ConfigDict(arbitrary_types_allowed=True)
is_pipeline_class: bool = True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stages = []
self._map_pipeline_components()
def _get_all_functions(self):
return {
name: getattr(self, name)
for name in dir(self)
if callable(getattr(self, name))
}
def _filter_functions(
self, functions: Dict[str, Callable], attribute: str
) -> Dict[str, Callable]:
return {
name: func
for name, func in functions.items()
if hasattr(func, attribute)
}
def _map_pipeline_components(self):
all_functions = self._get_all_functions()
crew_functions = self._filter_functions(all_functions, "is_crew")
router_functions = self._filter_functions(all_functions, "is_router")
for stage_attr in dir(self):
stage = getattr(self, stage_attr)
if isinstance(stage, (Crew, Router)):
self.stages.append(stage)
elif callable(stage) and hasattr(stage, "is_crew"):
self.stages.append(crew_functions[stage_attr]())
elif callable(stage) and hasattr(stage, "is_router"):
self.stages.append(router_functions[stage_attr]())
elif isinstance(stage, list) and all(
isinstance(item, Crew) for item in stage
):
self.stages.append(
[crew_functions[item.__name__]() for item in stage]
)
def build_pipeline(self) -> Pipeline:
return Pipeline(stages=self.stages)
return WrappedClass

View File

@@ -0,0 +1,11 @@
def memoize(func):
cache = {}
def memoized_func(*args, **kwargs):
key = (args, tuple(kwargs.items()))
if key not in cache:
cache[key] = func(*args, **kwargs)
return cache[key]
memoized_func.__dict__.update(func.__dict__)
return memoized_func