Update Pipeline docs

This commit is contained in:
Brandon Hancock
2024-07-23 11:24:57 -04:00
parent 602ade4cc4
commit 6b4ebe16d0
3 changed files with 60 additions and 63 deletions

View File

@@ -78,64 +78,6 @@ my_pipeline = Pipeline(
## Pipeline Output
The output of a pipeline is encapsulated within the `PipelineRunResult` class, which provides structured access to the results of the pipeline's execution.
### Pipeline Run Result Attributes
| Attribute | Type | Description |
| :---------------- | :------------------------- | :------------------------------------------------------------------------------------------------------------------------- |
| **Raw** | `str` | The raw output of the final stage in the pipeline. |
| **Pydantic** | `Optional[BaseModel]` | A Pydantic model object representing the structured output of the final stage. |
| **JSON Dict** | `Optional[Dict[str, Any]]` | A dictionary representing the JSON output of the final stage. |
| **Token Usage** | `Dict[str, Any]` | A summary of token usage across all stages of the pipeline. |
| **Trace** | `List[Any]` | A trace of the journey of inputs through the pipeline run, showing how data flowed and was transformed through each stage. |
| **Crews Outputs** | `List[CrewOutput]` | A list of `CrewOutput` objects, representing the outputs from each crew in the pipeline. |
## Using Pipelines
Pipelines are particularly useful for complex workflows that involve multiple stages of processing, analysis, or content generation. They allow you to:
1. **Sequence Operations**: Execute crews in a specific order, ensuring that the output of one crew is available as input to the next.
2. **Parallel Processing**: Run multiple crews concurrently within a stage for increased efficiency.
3. **Manage Complex Workflows**: Break down large tasks into smaller, manageable steps executed by specialized crews.
### Example: Running a Pipeline
```python
# Define input data for the pipeline
input_data = [{"initial_query": "Latest advancements in AI"}]
# Execute the pipeline, initiating a run for each input
results = await my_pipeline.process_runs(input_data)
# Access the results
for result in results:
print(f"Final Output: {result.raw}")
print(f"Token Usage: {result.token_usage}")
print(f"Trace: {result.trace}") # Shows the path of the input through all stages
```
## Advanced Features
### Parallel Execution within Stages
You can define parallel execution within a stage by providing a list of crews, creating multiple branches:
```python
parallel_analysis_crew = Crew(agents=[financial_analyst], tasks=[financial_analysis_task])
market_analysis_crew = Crew(agents=[market_analyst], tasks=[market_analysis_task])
my_pipeline = Pipeline(
stages=[
research_crew,
[parallel_analysis_crew, market_analysis_crew], # Parallel execution (branching)
writing_crew
]
)
```
## Pipeline Output
!!! note "Understanding Pipeline Outputs"
The output of a pipeline in the crewAI framework is encapsulated within two main classes: `PipelineOutput` and `PipelineRunResult`. These classes provide a structured way to access the results of the pipeline's execution, including various formats such as raw strings, JSON, and Pydantic models.
@@ -203,4 +145,58 @@ for run_result in pipeline_output.run_results:
This example demonstrates how to access and work with the pipeline output, including individual run results and their associated data.
[... rest of the document remains unchanged ...]
## Using Pipelines
Pipelines are particularly useful for complex workflows that involve multiple stages of processing, analysis, or content generation. They allow you to:
1. **Sequence Operations**: Execute crews in a specific order, ensuring that the output of one crew is available as input to the next.
2. **Parallel Processing**: Run multiple crews concurrently within a stage for increased efficiency.
3. **Manage Complex Workflows**: Break down large tasks into smaller, manageable steps executed by specialized crews.
### Example: Running a Pipeline
```python
# Define input data for the pipeline
input_data = [{"initial_query": "Latest advancements in AI"}]
# Execute the pipeline, initiating a run for each input
results = await my_pipeline.process_runs(input_data)
# Access the results
for result in results:
print(f"Final Output: {result.raw}")
print(f"Token Usage: {result.token_usage}")
print(f"Trace: {result.trace}") # Shows the path of the input through all stages
```
## Advanced Features
### Parallel Execution within Stages
You can define parallel execution within a stage by providing a list of crews, creating multiple branches:
```python
parallel_analysis_crew = Crew(agents=[financial_analyst], tasks=[financial_analysis_task])
market_analysis_crew = Crew(agents=[market_analyst], tasks=[market_analysis_task])
my_pipeline = Pipeline(
stages=[
research_crew,
[parallel_analysis_crew, market_analysis_crew], # Parallel execution (branching)
writing_crew
]
)
```
### Error Handling and Validation
The Pipeline class includes validation mechanisms to ensure the robustness of the pipeline structure:
- Validates that stages contain only Crew instances or lists of Crew instances.
- Prevents double nesting of stages to maintain a clear structure.
## Best Practices for Pipelines
1. **Clear Stage Definition**: Define each stage with a clear purpose and expected output.
2. **Effective Data Flow**: Ensure that the output of each stage is in a format suitable for the input of the next stage.
3. \*\*Parallel Proce

View File

@@ -33,6 +33,11 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
Crews
</a>
</li>
<li>
<a href="./core-concepts/Pipeline">
Pipeline
</a>
</li>
<li>
<a href="./core-concepts/Training-Crew">
Training

View File

@@ -441,12 +441,8 @@ TODO: Figure out what is the proper output for a pipeline with multiple stages
Options:
- Should the final output only include the last stage's output?
- Should the final output include the accumulation of previous stages' outputs?
"""
# TODO: GET HELP FROM TEAM ON THIS ONE
@pytest.mark.asyncio
async def test_pipeline_data_accumulation(mock_crew_factory):
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"})