Files
crewAI/src/crewai/cli/templates/pipeline/pipelines/pipeline.py
Lorenze Jay 62f5b2fb2e Brandon/cre 130 pipeline project structure (#1066)
* WIP. Procedure appears to be working well. Working on mocking properly for tests

* All tests are passing now

* rshift working

* Add back in Gui's tool_usage fix

* WIP

* Going to start refactoring for pipeline_output

* Update terminology

* new pipeline flow with traces and usage metrics working. need to add more tests and make sure PipelineOutput behaves likew CrewOutput

* Fix pipelineoutput to look more like crewoutput and taskoutput

* Implemented additional tests for pipeline. One test is failing. Need team support

* Update docs for pipeline

* Update pipeline to properly process input and ouput dictionary

* Update Pipeline docs

* Add back in commentary at top of pipeline file

* Starting to work on router

* Drop router for now. will add in separately

* In the middle of fixing router. A ton of circular dependencies. Moving over to a new design.

* WIP.

* Fix circular dependencies and updated PipelineRouter

* Add in Eduardo feedback. Still need to add in more commentary describing the design decisions for pipeline

* Add developer notes to explain what is going on in pipelines.

* Add doc strings

* Fix missing rag datatype

* WIP. Converting usage metrics from a dict to an object

* Fix tests that were checking usage metrics

* Drop todo

* Fix 1 type error in pipeline

* Update pipeline to use UsageMetric

* Add missing doc string

* WIP.

* Change names

* Rename variables based on joaos feedback

* Fix critical circular dependency issues. Now needing to fix trace issue.

* Tests working now!

* Add more tests which showed underlying issue with traces

* Fix tests

* Remove overly complicated test

* Add router example to docs

* Clean up end of docs

* Clean up docs

* Working on creating Crew templates and pipeline templates

* WIP.

* WIP

* Fix poetry install from templates

* WIP

* Restructure

* changes for lorenze

* more todos

* WIP: create pipelines cli working

* wrapped up router

* ignore mypy src on templates

* ignored signature of copy

* fix all verbose

* rm print statements

* brought back correct folders

* fixes missing folders and then rm print statements

* fixed tests

* fixed broken test

* fixed type checker

* fixed type ignore

* ignore types for templates

* needed

* revert

* exclude only required

* rm type errors on templates

* rm excluding type checks for template files on github action

* fixed missing quotes

---------

Co-authored-by: Brandon Hancock <brandon@brandonhancock.io>
2024-08-09 14:13:29 -07:00

87 lines
3.0 KiB
Python

"""
This pipeline file includes two different examples to demonstrate the flexibility of crewAI pipelines.
Example 1: Two-Stage Pipeline
-----------------------------
This pipeline consists of two crews:
1. ResearchCrew: Performs research on a given topic.
2. WriteXCrew: Generates an X (Twitter) post based on the research findings.
Key features:
- The ResearchCrew's final task uses output_json to store all research findings in a JSON object.
- This JSON object is then passed to the WriteXCrew, where tasks can access the research findings.
Example 2: Two-Stage Pipeline with Parallel Execution
-------------------------------------------------------
This pipeline consists of three crews:
1. ResearchCrew: Performs research on a given topic.
2. WriteXCrew and WriteLinkedInCrew: Run in parallel, using the research findings to generate posts for X and LinkedIn, respectively.
Key features:
- Demonstrates the ability to run multiple crews in parallel.
- Shows how to structure a pipeline with both sequential and parallel stages.
Usage:
- To switch between examples, comment/uncomment the respective code blocks below.
- Ensure that you have implemented all necessary crew classes (ResearchCrew, WriteXCrew, WriteLinkedInCrew) before running.
"""
# Common imports for both examples
from crewai import Pipeline
# Uncomment the crews you need for your chosen example
from ..crews.research_crew.research_crew import ResearchCrew
from ..crews.write_x_crew.write_x_crew import WriteXCrew
# from .crews.write_linkedin_crew.write_linkedin_crew import WriteLinkedInCrew # Uncomment for Example 2
# EXAMPLE 1: Two-Stage Pipeline
# -----------------------------
# Uncomment the following code block to use Example 1
class {{pipeline_name}}Pipeline:
def __init__(self):
# Initialize crews
self.research_crew = ResearchCrew().crew()
self.write_x_crew = WriteXCrew().crew()
def create_pipeline(self):
return Pipeline(
stages=[
self.research_crew,
self.write_x_crew
]
)
async def kickoff(self, inputs):
pipeline = self.create_pipeline()
results = await pipeline.kickoff(inputs)
return results
# EXAMPLE 2: Two-Stage Pipeline with Parallel Execution
# -------------------------------------------------------
# Uncomment the following code block to use Example 2
# @PipelineBase
# class {{pipeline_name}}Pipeline:
# def __init__(self):
# # Initialize crews
# self.research_crew = ResearchCrew().crew()
# self.write_x_crew = WriteXCrew().crew()
# self.write_linkedin_crew = WriteLinkedInCrew().crew()
# @pipeline
# def create_pipeline(self):
# return Pipeline(
# stages=[
# self.research_crew,
# [self.write_x_crew, self.write_linkedin_crew] # Parallel execution
# ]
# )
# async def run(self, inputs):
# pipeline = self.create_pipeline()
# results = await pipeline.kickoff(inputs)
# return results