mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 16:48:30 +00:00
wrapped up router
This commit is contained in:
@@ -35,8 +35,26 @@ def create_pipeline(name, router=False):
|
||||
root_template_files = [".gitignore", "pyproject.toml", "README.md"]
|
||||
src_template_files = ["__init__.py", "main.py"]
|
||||
tools_template_files = ["tools/__init__.py", "tools/custom_tool.py"]
|
||||
crew_folders = ["research_crew", "write_x_crew", "write_linkedin_crew"]
|
||||
pipelines_folders = ["pipelines/__init__.py", "pipelines/pipeline.py"]
|
||||
|
||||
if router:
|
||||
crew_folders = [
|
||||
"classifier_crew",
|
||||
"normal_crew",
|
||||
"urgent_crew",
|
||||
]
|
||||
pipelines_folders = [
|
||||
"pipelines/__init__.py",
|
||||
"pipelines/pipeline_classifier.py",
|
||||
"pipelines/pipeline_normal.py",
|
||||
"pipelines/pipeline_urgent.py",
|
||||
]
|
||||
else:
|
||||
crew_folders = [
|
||||
"classifier_crew",
|
||||
"normal_crew",
|
||||
"urgent_crew",
|
||||
]
|
||||
pipelines_folders = ["pipelines/__init__.py", "pipelines/pipeline.py"]
|
||||
|
||||
def process_file(src_file, dst_file):
|
||||
with open(src_file, "r") as file:
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
from crewai import Agent, Crew, Process, Task
|
||||
from crewai.project import CrewBase, agent, crew, task
|
||||
from pydantic import BaseModel
|
||||
|
||||
# Uncomment the following line to use an example of a custom tool
|
||||
# from demo_pipeline.tools.custom_tool import MyCustomTool
|
||||
|
||||
# Check our tools documentations for more information on how to use them
|
||||
# from crewai_tools import SerperDevTool
|
||||
|
||||
class UrgencyScore(BaseModel):
|
||||
urgency_score: int
|
||||
|
||||
@CrewBase
|
||||
class ClassifierCrew:
|
||||
"""Email Classifier Crew"""
|
||||
|
||||
agents_config = "config/agents.yaml"
|
||||
tasks_config = "config/tasks.yaml"
|
||||
|
||||
@agent
|
||||
def classifier(self) -> Agent:
|
||||
return Agent(config=self.agents_config["classifier"], verbose=True)
|
||||
|
||||
@task
|
||||
def urgent_task(self) -> Task:
|
||||
return Task(
|
||||
config=self.tasks_config["classify_email"],
|
||||
output_pydantic=UrgencyScore,
|
||||
)
|
||||
|
||||
@crew
|
||||
def crew(self) -> Crew:
|
||||
"""Creates the Email Classifier Crew"""
|
||||
return Crew(
|
||||
agents=self.agents, # Automatically created by the @agent decorator
|
||||
tasks=self.tasks, # Automatically created by the @task decorator
|
||||
process=Process.sequential,
|
||||
verbose=2,
|
||||
)
|
||||
@@ -0,0 +1,7 @@
|
||||
classifier:
|
||||
role: >
|
||||
Email Classifier
|
||||
goal: >
|
||||
Classify the email: {email} as urgent or normal from a score of 1 to 10, where 1 is not urgent and 10 is urgent. Return the urgency score only.`
|
||||
backstory: >
|
||||
You are a highly efficient and experienced email classifier, trained to quickly assess and classify emails. Your ability to remain calm under pressure and provide concise, actionable responses has made you an invaluable asset in managing normal situations and maintaining smooth operations.
|
||||
@@ -0,0 +1,7 @@
|
||||
classify_email:
|
||||
description: >
|
||||
Classify the email: {email}
|
||||
as urgent or normal.
|
||||
expected_output: >
|
||||
Classify the email from a scale of 1 to 10, where 1 is not urgent and 10 is urgent. Return the urgency score only.
|
||||
agent: classifier
|
||||
@@ -1,53 +0,0 @@
|
||||
from crewai import Agent, Crew, Process, Task
|
||||
from crewai.project import CrewBase, agent, crew, task
|
||||
|
||||
# Uncomment the following line to use an example of a custom tool
|
||||
# from {{folder_name}}.tools.custom_tool import MyCustomTool
|
||||
|
||||
# Check our tools documentations for more information on how to use them
|
||||
# from crewai_tools import SerperDevTool
|
||||
|
||||
@CrewBase
|
||||
class {{crew_name}}Crew():
|
||||
"""{{crew_name}} crew"""
|
||||
agents_config = '../config/agents.yaml'
|
||||
tasks_config = '../config/tasks.yaml'
|
||||
|
||||
@agent
|
||||
def researcher(self) -> Agent:
|
||||
return Agent(
|
||||
config=self.agents_config['researcher'],
|
||||
# tools=[MyCustomTool()], # Example of custom tool, loaded on the beginning of file
|
||||
verbose=True
|
||||
)
|
||||
|
||||
@agent
|
||||
def reporting_analyst(self) -> Agent:
|
||||
return Agent(
|
||||
config=self.agents_config['reporting_analyst'],
|
||||
verbose=True
|
||||
)
|
||||
|
||||
@task
|
||||
def research_task(self) -> Task:
|
||||
return Task(
|
||||
config=self.tasks_config['research_task'],
|
||||
)
|
||||
|
||||
@task
|
||||
def reporting_task(self) -> Task:
|
||||
return Task(
|
||||
config=self.tasks_config['reporting_task'],
|
||||
output_file='report.md'
|
||||
)
|
||||
|
||||
@crew
|
||||
def crew(self) -> Crew:
|
||||
"""Creates the {{crew_name}} crew"""
|
||||
return Crew(
|
||||
agents=self.agents, # Automatically created by the @agent decorator
|
||||
tasks=self.tasks, # Automatically created by the @task decorator
|
||||
process=Process.sequential,
|
||||
verbose=2,
|
||||
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
|
||||
)
|
||||
@@ -0,0 +1,7 @@
|
||||
normal_handler:
|
||||
role: >
|
||||
Normal Email Processor
|
||||
goal: >
|
||||
Process normal emails and create an email to respond to the sender.
|
||||
backstory: >
|
||||
You are a highly efficient and experienced normal email handler, trained to quickly assess and respond to normal communications. Your ability to remain calm under pressure and provide concise, actionable responses has made you an invaluable asset in managing normal situations and maintaining smooth operations.
|
||||
@@ -0,0 +1,6 @@
|
||||
normal_task:
|
||||
description: >
|
||||
Process and respond to normal email quickly.
|
||||
expected_output: >
|
||||
An email response to the normal email.
|
||||
agent: normal_handler
|
||||
@@ -0,0 +1,36 @@
|
||||
from crewai import Agent, Crew, Process, Task
|
||||
from crewai.project import CrewBase, agent, crew, task
|
||||
|
||||
# Uncomment the following line to use an example of a custom tool
|
||||
# from demo_pipeline.tools.custom_tool import MyCustomTool
|
||||
|
||||
# Check our tools documentations for more information on how to use them
|
||||
# from crewai_tools import SerperDevTool
|
||||
|
||||
|
||||
@CrewBase
|
||||
class NormalCrew:
|
||||
"""Normal Email Crew"""
|
||||
|
||||
agents_config = "config/agents.yaml"
|
||||
tasks_config = "config/tasks.yaml"
|
||||
|
||||
@agent
|
||||
def normal_handler(self) -> Agent:
|
||||
return Agent(config=self.agents_config["normal_handler"], verbose=True)
|
||||
|
||||
@task
|
||||
def urgent_task(self) -> Task:
|
||||
return Task(
|
||||
config=self.tasks_config["normal_task"],
|
||||
)
|
||||
|
||||
@crew
|
||||
def crew(self) -> Crew:
|
||||
"""Creates the Normal Email Crew"""
|
||||
return Crew(
|
||||
agents=self.agents, # Automatically created by the @agent decorator
|
||||
tasks=self.tasks, # Automatically created by the @task decorator
|
||||
process=Process.sequential,
|
||||
verbose=2,
|
||||
)
|
||||
@@ -0,0 +1,7 @@
|
||||
urgent_handler:
|
||||
role: >
|
||||
Urgent Email Processor
|
||||
goal: >
|
||||
Process urgent emails and create an email to respond to the sender.
|
||||
backstory: >
|
||||
You are a highly efficient and experienced urgent email handler, trained to quickly assess and respond to time-sensitive communications. Your ability to remain calm under pressure and provide concise, actionable responses has made you an invaluable asset in managing critical situations and maintaining smooth operations.
|
||||
@@ -0,0 +1,6 @@
|
||||
urgent_task:
|
||||
description: >
|
||||
Process and respond to urgent email quickly.
|
||||
expected_output: >
|
||||
An email response to the urgent email.
|
||||
agent: urgent_handler
|
||||
@@ -0,0 +1,36 @@
|
||||
from crewai import Agent, Crew, Process, Task
|
||||
from crewai.project import CrewBase, agent, crew, task
|
||||
|
||||
# Uncomment the following line to use an example of a custom tool
|
||||
# from demo_pipeline.tools.custom_tool import MyCustomTool
|
||||
|
||||
# Check our tools documentations for more information on how to use them
|
||||
# from crewai_tools import SerperDevTool
|
||||
|
||||
|
||||
@CrewBase
|
||||
class UrgentCrew:
|
||||
"""Urgent Email Crew"""
|
||||
|
||||
agents_config = "config/agents.yaml"
|
||||
tasks_config = "config/tasks.yaml"
|
||||
|
||||
@agent
|
||||
def urgent_handler(self) -> Agent:
|
||||
return Agent(config=self.agents_config["urgent_handler"], verbose=True)
|
||||
|
||||
@task
|
||||
def urgent_task(self) -> Task:
|
||||
return Task(
|
||||
config=self.tasks_config["urgent_task"],
|
||||
)
|
||||
|
||||
@crew
|
||||
def crew(self) -> Crew:
|
||||
"""Creates the Urgent Email Crew"""
|
||||
return Crew(
|
||||
agents=self.agents, # Automatically created by the @agent decorator
|
||||
tasks=self.tasks, # Automatically created by the @task decorator
|
||||
process=Process.sequential,
|
||||
verbose=2,
|
||||
)
|
||||
@@ -1,37 +1,63 @@
|
||||
#!/usr/bin/env python
|
||||
import asyncio
|
||||
from {
|
||||
from crewai.routers.router import Route
|
||||
from crewai.routers.router import Router
|
||||
|
||||
from crewai.routers.router import Route{folder_name}}.pipeline import {{pipeline_name}}Pipeline
|
||||
from {{folder_name}}.pipelines.pipeline_classifier import EmailClassifierPipeline
|
||||
from {{folder_name}}.pipelines.pipeline_normal import NormalPipeline
|
||||
from {{folder_name}}.pipelines.pipeline_urgent import UrgentPipeline
|
||||
|
||||
async def run():
|
||||
"""
|
||||
Run the pipeline.
|
||||
"""
|
||||
inputs = [
|
||||
{"topic": "AI wearables"},
|
||||
{
|
||||
"email": """
|
||||
Subject: URGENT: Marketing Campaign Launch - Immediate Action Required
|
||||
Dear Team,
|
||||
I'm reaching out regarding our upcoming marketing campaign that requires your immediate attention and swift action. We're facing a critical deadline, and our success hinges on our ability to mobilize quickly.
|
||||
Key points:
|
||||
|
||||
Campaign launch: 48 hours from now
|
||||
Target audience: 250,000 potential customers
|
||||
Expected ROI: 35% increase in Q3 sales
|
||||
|
||||
What we need from you NOW:
|
||||
|
||||
Final approval on creative assets (due in 3 hours)
|
||||
Confirmation of media placements (due by end of day)
|
||||
Last-minute budget allocation for paid social media push
|
||||
|
||||
Our competitors are poised to launch similar campaigns, and we must act fast to maintain our market advantage. Delays could result in significant lost opportunities and potential revenue.
|
||||
Please prioritize this campaign above all other tasks. I'll be available for the next 24 hours to address any concerns or roadblocks.
|
||||
Let's make this happen!
|
||||
[Your Name]
|
||||
Marketing Director
|
||||
P.S. I'll be scheduling an emergency team meeting in 1 hour to discuss our action plan. Attendance is mandatory.
|
||||
"""
|
||||
}
|
||||
]
|
||||
|
||||
# TODO: Pull all of these out dynamically from the /pipelines folder
|
||||
pipeline_categorzie = {{pipeline_name}}Pipeline()
|
||||
pipeline_high_priority = {{pipeline_name}}Pipeline()
|
||||
pipeline_low_priority = {{pipeline_name}}Pipeline()
|
||||
pipeline_classifier = EmailClassifierPipeline().create_pipeline()
|
||||
pipeline_urgent = UrgentPipeline().create_pipeline()
|
||||
pipeline_normal = NormalPipeline().create_pipeline()
|
||||
|
||||
router = Router(
|
||||
routes={
|
||||
"high_urgency": Route(
|
||||
condition=lambda x: x.get("urgency_score", 0) > 7,
|
||||
pipeline=pipeline_high_priority
|
||||
pipeline=pipeline_urgent
|
||||
),
|
||||
"low_urgency": Route(
|
||||
condition=lambda x: x.get("urgency_score", 0) <= 7,
|
||||
pipeline=pipeline_low_priority
|
||||
pipeline=pipeline_normal
|
||||
)
|
||||
},
|
||||
default=Pipeline(stages=[pipeline_low_priority])
|
||||
default=pipeline_normal
|
||||
)
|
||||
|
||||
pipeline = pipeline_categorzie >> router
|
||||
pipeline = pipeline_classifier >> router
|
||||
|
||||
results = await pipeline.kickoff(inputs)
|
||||
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
from crewai import Pipeline
|
||||
from crewai.project import PipelineBase
|
||||
from crews.crew import *
|
||||
|
||||
@PipelineBase
|
||||
class {{pipeline_name}}Pipeline:
|
||||
def __init__(self):
|
||||
# Initialize crews
|
||||
{% for crew_name in crew_names %}
|
||||
self.{{crew_name.lower()}}_crew = {{crew_name}}Crew().crew()
|
||||
{% endfor %}
|
||||
|
||||
@pipeline
|
||||
def create_pipeline(self):
|
||||
return Pipeline(
|
||||
stages=[
|
||||
{% for crew_name in crew_names %}
|
||||
self.{{crew_name.lower()}}_crew,
|
||||
{% endfor %}
|
||||
]
|
||||
)
|
||||
|
||||
async def run(self, inputs):
|
||||
pipeline = self.create_pipeline()
|
||||
results = await pipeline.kickoff(inputs)
|
||||
return results
|
||||
@@ -0,0 +1,24 @@
|
||||
from crewai import Pipeline
|
||||
from crewai.project import PipelineBase
|
||||
from ..crews.classifier_crew.classifier_crew import ClassifierCrew
|
||||
|
||||
|
||||
@PipelineBase
|
||||
class EmailClassifierPipeline:
|
||||
def __init__(self):
|
||||
# Initialize crews
|
||||
self.classifier_crew = ClassifierCrew().crew()
|
||||
|
||||
def create_pipeline(self):
|
||||
return Pipeline(
|
||||
stages=[
|
||||
self.classifier_crew
|
||||
]
|
||||
)
|
||||
|
||||
async def kickoff(self, inputs):
|
||||
pipeline = self.create_pipeline()
|
||||
results = await pipeline.kickoff(inputs)
|
||||
return results
|
||||
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
from crewai import Pipeline
|
||||
from crewai.project import PipelineBase
|
||||
from ..crews.normal_crew.normal_crew import NormalCrew
|
||||
|
||||
|
||||
@PipelineBase
|
||||
class NormalPipeline:
|
||||
def __init__(self):
|
||||
# Initialize crews
|
||||
self.normal_crew = NormalCrew().crew()
|
||||
|
||||
def create_pipeline(self):
|
||||
return Pipeline(
|
||||
stages=[
|
||||
self.normal_crew
|
||||
]
|
||||
)
|
||||
|
||||
async def kickoff(self, inputs):
|
||||
pipeline = self.create_pipeline()
|
||||
results = await pipeline.kickoff(inputs)
|
||||
return results
|
||||
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
from crewai import Pipeline
|
||||
from crewai.project import PipelineBase
|
||||
from ..crews.urgent_crew.urgent_crew import UrgentCrew
|
||||
|
||||
@PipelineBase
|
||||
class UrgentPipeline:
|
||||
def __init__(self):
|
||||
# Initialize crews
|
||||
self.urgent_crew = UrgentCrew().crew()
|
||||
|
||||
def create_pipeline(self):
|
||||
return Pipeline(
|
||||
stages=[
|
||||
self.urgent_crew
|
||||
]
|
||||
)
|
||||
|
||||
async def kickoff(self, inputs):
|
||||
pipeline = self.create_pipeline()
|
||||
results = await pipeline.kickoff(inputs)
|
||||
return results
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ python = ">=3.10,<=3.13"
|
||||
crewai = { extras = ["tools"], version = "^0.46.0" }
|
||||
|
||||
[tool.poetry.scripts]
|
||||
{{folder_name}} = "{{folder_name}}.main:run"
|
||||
{{folder_name}} = "{{folder_name}}.main:main"
|
||||
train = "{{folder_name}}.main:train"
|
||||
replay = "{{folder_name}}.main:replay"
|
||||
test = "{{folder_name}}.main:test"
|
||||
|
||||
Reference in New Issue
Block a user