Files
crewAI/src/crewai/cli/templates/pipeline_router/main.py
Lorenze Jay 62f5b2fb2e Brandon/cre 130 pipeline project structure (#1066)
* WIP. Procedure appears to be working well. Working on mocking properly for tests

* All tests are passing now

* rshift working

* Add back in Gui's tool_usage fix

* WIP

* Going to start refactoring for pipeline_output

* Update terminology

* new pipeline flow with traces and usage metrics working. need to add more tests and make sure PipelineOutput behaves likew CrewOutput

* Fix pipelineoutput to look more like crewoutput and taskoutput

* Implemented additional tests for pipeline. One test is failing. Need team support

* Update docs for pipeline

* Update pipeline to properly process input and ouput dictionary

* Update Pipeline docs

* Add back in commentary at top of pipeline file

* Starting to work on router

* Drop router for now. will add in separately

* In the middle of fixing router. A ton of circular dependencies. Moving over to a new design.

* WIP.

* Fix circular dependencies and updated PipelineRouter

* Add in Eduardo feedback. Still need to add in more commentary describing the design decisions for pipeline

* Add developer notes to explain what is going on in pipelines.

* Add doc strings

* Fix missing rag datatype

* WIP. Converting usage metrics from a dict to an object

* Fix tests that were checking usage metrics

* Drop todo

* Fix 1 type error in pipeline

* Update pipeline to use UsageMetric

* Add missing doc string

* WIP.

* Change names

* Rename variables based on joaos feedback

* Fix critical circular dependency issues. Now needing to fix trace issue.

* Tests working now!

* Add more tests which showed underlying issue with traces

* Fix tests

* Remove overly complicated test

* Add router example to docs

* Clean up end of docs

* Clean up docs

* Working on creating Crew templates and pipeline templates

* WIP.

* WIP

* Fix poetry install from templates

* WIP

* Restructure

* changes for lorenze

* more todos

* WIP: create pipelines cli working

* wrapped up router

* ignore mypy src on templates

* ignored signature of copy

* fix all verbose

* rm print statements

* brought back correct folders

* fixes missing folders and then rm print statements

* fixed tests

* fixed broken test

* fixed type checker

* fixed type ignore

* ignore types for templates

* needed

* revert

* exclude only required

* rm type errors on templates

* rm excluding type checks for template files on github action

* fixed missing quotes

---------

Co-authored-by: Brandon Hancock <brandon@brandonhancock.io>
2024-08-09 14:13:29 -07:00

75 lines
2.7 KiB
Python

#!/usr/bin/env python
import asyncio
from crewai.routers.router import Route
from crewai.routers.router import Router
from {{folder_name}}.pipelines.pipeline_classifier import EmailClassifierPipeline
from {{folder_name}}.pipelines.pipeline_normal import NormalPipeline
from {{folder_name}}.pipelines.pipeline_urgent import UrgentPipeline
async def run():
"""
Run the pipeline.
"""
inputs = [
{
"email": """
Subject: URGENT: Marketing Campaign Launch - Immediate Action Required
Dear Team,
I'm reaching out regarding our upcoming marketing campaign that requires your immediate attention and swift action. We're facing a critical deadline, and our success hinges on our ability to mobilize quickly.
Key points:
Campaign launch: 48 hours from now
Target audience: 250,000 potential customers
Expected ROI: 35% increase in Q3 sales
What we need from you NOW:
Final approval on creative assets (due in 3 hours)
Confirmation of media placements (due by end of day)
Last-minute budget allocation for paid social media push
Our competitors are poised to launch similar campaigns, and we must act fast to maintain our market advantage. Delays could result in significant lost opportunities and potential revenue.
Please prioritize this campaign above all other tasks. I'll be available for the next 24 hours to address any concerns or roadblocks.
Let's make this happen!
[Your Name]
Marketing Director
P.S. I'll be scheduling an emergency team meeting in 1 hour to discuss our action plan. Attendance is mandatory.
"""
}
]
pipeline_classifier = EmailClassifierPipeline().create_pipeline()
pipeline_urgent = UrgentPipeline().create_pipeline()
pipeline_normal = NormalPipeline().create_pipeline()
router = Router(
routes={
"high_urgency": Route(
condition=lambda x: x.get("urgency_score", 0) > 7,
pipeline=pipeline_urgent
),
"low_urgency": Route(
condition=lambda x: x.get("urgency_score", 0) <= 7,
pipeline=pipeline_normal
)
},
default=pipeline_normal
)
pipeline = pipeline_classifier >> router
results = await pipeline.kickoff(inputs)
# Process and print results
for result in results:
print(f"Raw output: {result.raw}")
if result.json_dict:
print(f"JSON output: {result.json_dict}")
print("\n")
def main():
asyncio.run(run())
if __name__ == "__main__":
main()