From 6b4ebe16d0654233afb9fd4229a2504c6d50de9c Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Tue, 23 Jul 2024 11:24:57 -0400 Subject: [PATCH] Update Pipeline docs --- docs/core-concepts/Pipeline.md | 114 +++++++++++++++----------------- docs/index.md | 5 ++ tests/pipeline/test_pipeline.py | 4 -- 3 files changed, 60 insertions(+), 63 deletions(-) diff --git a/docs/core-concepts/Pipeline.md b/docs/core-concepts/Pipeline.md index 581c55c9c..28d399355 100644 --- a/docs/core-concepts/Pipeline.md +++ b/docs/core-concepts/Pipeline.md @@ -78,64 +78,6 @@ my_pipeline = Pipeline( ## Pipeline Output -The output of a pipeline is encapsulated within the `PipelineRunResult` class, which provides structured access to the results of the pipeline's execution. - -### Pipeline Run Result Attributes - -| Attribute | Type | Description | -| :---------------- | :------------------------- | :------------------------------------------------------------------------------------------------------------------------- | -| **Raw** | `str` | The raw output of the final stage in the pipeline. | -| **Pydantic** | `Optional[BaseModel]` | A Pydantic model object representing the structured output of the final stage. | -| **JSON Dict** | `Optional[Dict[str, Any]]` | A dictionary representing the JSON output of the final stage. | -| **Token Usage** | `Dict[str, Any]` | A summary of token usage across all stages of the pipeline. | -| **Trace** | `List[Any]` | A trace of the journey of inputs through the pipeline run, showing how data flowed and was transformed through each stage. | -| **Crews Outputs** | `List[CrewOutput]` | A list of `CrewOutput` objects, representing the outputs from each crew in the pipeline. | - -## Using Pipelines - -Pipelines are particularly useful for complex workflows that involve multiple stages of processing, analysis, or content generation. They allow you to: - -1. **Sequence Operations**: Execute crews in a specific order, ensuring that the output of one crew is available as input to the next. -2. **Parallel Processing**: Run multiple crews concurrently within a stage for increased efficiency. -3. **Manage Complex Workflows**: Break down large tasks into smaller, manageable steps executed by specialized crews. - -### Example: Running a Pipeline - -```python -# Define input data for the pipeline -input_data = [{"initial_query": "Latest advancements in AI"}] - -# Execute the pipeline, initiating a run for each input -results = await my_pipeline.process_runs(input_data) - -# Access the results -for result in results: - print(f"Final Output: {result.raw}") - print(f"Token Usage: {result.token_usage}") - print(f"Trace: {result.trace}") # Shows the path of the input through all stages -``` - -## Advanced Features - -### Parallel Execution within Stages - -You can define parallel execution within a stage by providing a list of crews, creating multiple branches: - -```python -parallel_analysis_crew = Crew(agents=[financial_analyst], tasks=[financial_analysis_task]) -market_analysis_crew = Crew(agents=[market_analyst], tasks=[market_analysis_task]) - -my_pipeline = Pipeline( - stages=[ - research_crew, - [parallel_analysis_crew, market_analysis_crew], # Parallel execution (branching) - writing_crew - ] -) -``` - -## Pipeline Output - !!! note "Understanding Pipeline Outputs" The output of a pipeline in the crewAI framework is encapsulated within two main classes: `PipelineOutput` and `PipelineRunResult`. These classes provide a structured way to access the results of the pipeline's execution, including various formats such as raw strings, JSON, and Pydantic models. @@ -203,4 +145,58 @@ for run_result in pipeline_output.run_results: This example demonstrates how to access and work with the pipeline output, including individual run results and their associated data. -[... rest of the document remains unchanged ...] +## Using Pipelines + +Pipelines are particularly useful for complex workflows that involve multiple stages of processing, analysis, or content generation. They allow you to: + +1. **Sequence Operations**: Execute crews in a specific order, ensuring that the output of one crew is available as input to the next. +2. **Parallel Processing**: Run multiple crews concurrently within a stage for increased efficiency. +3. **Manage Complex Workflows**: Break down large tasks into smaller, manageable steps executed by specialized crews. + +### Example: Running a Pipeline + +```python +# Define input data for the pipeline +input_data = [{"initial_query": "Latest advancements in AI"}] + +# Execute the pipeline, initiating a run for each input +results = await my_pipeline.process_runs(input_data) + +# Access the results +for result in results: + print(f"Final Output: {result.raw}") + print(f"Token Usage: {result.token_usage}") + print(f"Trace: {result.trace}") # Shows the path of the input through all stages +``` + +## Advanced Features + +### Parallel Execution within Stages + +You can define parallel execution within a stage by providing a list of crews, creating multiple branches: + +```python +parallel_analysis_crew = Crew(agents=[financial_analyst], tasks=[financial_analysis_task]) +market_analysis_crew = Crew(agents=[market_analyst], tasks=[market_analysis_task]) + +my_pipeline = Pipeline( + stages=[ + research_crew, + [parallel_analysis_crew, market_analysis_crew], # Parallel execution (branching) + writing_crew + ] +) +``` + +### Error Handling and Validation + +The Pipeline class includes validation mechanisms to ensure the robustness of the pipeline structure: + +- Validates that stages contain only Crew instances or lists of Crew instances. +- Prevents double nesting of stages to maintain a clear structure. + +## Best Practices for Pipelines + +1. **Clear Stage Definition**: Define each stage with a clear purpose and expected output. +2. **Effective Data Flow**: Ensure that the output of each stage is in a format suitable for the input of the next stage. +3. \*\*Parallel Proce diff --git a/docs/index.md b/docs/index.md index 77cdd9852..c02b1fb27 100644 --- a/docs/index.md +++ b/docs/index.md @@ -33,6 +33,11 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By Crews +
  • + + Pipeline + +
  • Training diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 7993c0804..e7b7a745c 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -441,12 +441,8 @@ TODO: Figure out what is the proper output for a pipeline with multiple stages Options: - Should the final output only include the last stage's output? - Should the final output include the accumulation of previous stages' outputs? - - """ - -# TODO: GET HELP FROM TEAM ON THIS ONE @pytest.mark.asyncio async def test_pipeline_data_accumulation(mock_crew_factory): crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"})