mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-24 00:08:29 +00:00
Compare commits
21 Commits
bugfix/fix
...
brandon/cr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5472f1cd45 | ||
|
|
2119ba7c32 | ||
|
|
b00bc44921 | ||
|
|
6b4ebe16d0 | ||
|
|
602ade4cc4 | ||
|
|
471c5b970c | ||
|
|
33d9828edc | ||
|
|
e95ef6fca9 | ||
|
|
afd6bff159 | ||
|
|
392490c48b | ||
|
|
d094e178f1 | ||
|
|
834c62feca | ||
|
|
c0c329b6e0 | ||
|
|
f737b3b379 | ||
|
|
467536b96a | ||
|
|
1988a00c60 | ||
|
|
e2f4405291 | ||
|
|
040e5a78d2 | ||
|
|
c5002eedd9 | ||
|
|
f7680d6157 | ||
|
|
adf93c91f7 |
202
docs/core-concepts/Pipeline.md
Normal file
202
docs/core-concepts/Pipeline.md
Normal file
@@ -0,0 +1,202 @@
|
|||||||
|
---
|
||||||
|
title: crewAI Pipelines
|
||||||
|
description: Understanding and utilizing pipelines in the crewAI framework for efficient multi-stage task processing.
|
||||||
|
---
|
||||||
|
|
||||||
|
## What is a Pipeline?
|
||||||
|
|
||||||
|
A pipeline in crewAI represents a structured workflow that allows for the sequential or parallel execution of multiple crews. It provides a way to organize complex processes involving multiple stages, where the output of one stage can serve as input for subsequent stages.
|
||||||
|
|
||||||
|
## Key Terminology
|
||||||
|
|
||||||
|
Understanding the following terms is crucial for working effectively with pipelines:
|
||||||
|
|
||||||
|
- **Stage**: A distinct part of the pipeline, which can be either sequential (a single crew) or parallel (multiple crews executing concurrently).
|
||||||
|
- **Run**: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
|
||||||
|
- **Branch**: Parallel executions within a stage (e.g., concurrent crew operations).
|
||||||
|
- **Trace**: The journey of an individual input through the entire pipeline, capturing the path and transformations it undergoes.
|
||||||
|
|
||||||
|
Example pipeline structure:
|
||||||
|
|
||||||
|
```
|
||||||
|
crew1 >> [crew2, crew3] >> crew4
|
||||||
|
```
|
||||||
|
|
||||||
|
This represents a pipeline with three stages:
|
||||||
|
|
||||||
|
1. A sequential stage (crew1)
|
||||||
|
2. A parallel stage with two branches (crew2 and crew3 executing concurrently)
|
||||||
|
3. Another sequential stage (crew4)
|
||||||
|
|
||||||
|
Each input creates its own run, flowing through all stages of the pipeline. Multiple runs can be processed concurrently, each following the defined pipeline structure.
|
||||||
|
|
||||||
|
## Pipeline Attributes
|
||||||
|
|
||||||
|
| Attribute | Parameters | Description |
|
||||||
|
| :--------- | :--------- | :------------------------------------------------------------------------------------ |
|
||||||
|
| **Stages** | `stages` | A list of crews or lists of crews representing the stages to be executed in sequence. |
|
||||||
|
|
||||||
|
## Creating a Pipeline
|
||||||
|
|
||||||
|
When creating a pipeline, you define a series of stages, each consisting of either a single crew or a list of crews for parallel execution. The pipeline ensures that each stage is executed in order, with the output of one stage feeding into the next.
|
||||||
|
|
||||||
|
### Example: Assembling a Pipeline
|
||||||
|
|
||||||
|
```python
|
||||||
|
from crewai import Crew, Agent, Task, Pipeline
|
||||||
|
|
||||||
|
# Define your crews
|
||||||
|
research_crew = Crew(
|
||||||
|
agents=[researcher],
|
||||||
|
tasks=[research_task],
|
||||||
|
process=Process.sequential
|
||||||
|
)
|
||||||
|
|
||||||
|
analysis_crew = Crew(
|
||||||
|
agents=[analyst],
|
||||||
|
tasks=[analysis_task],
|
||||||
|
process=Process.sequential
|
||||||
|
)
|
||||||
|
|
||||||
|
writing_crew = Crew(
|
||||||
|
agents=[writer],
|
||||||
|
tasks=[writing_task],
|
||||||
|
process=Process.sequential
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assemble the pipeline
|
||||||
|
my_pipeline = Pipeline(
|
||||||
|
stages=[research_crew, analysis_crew, writing_crew]
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Pipeline Methods
|
||||||
|
|
||||||
|
| Method | Description |
|
||||||
|
| :--------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||||
|
| **process_runs** | Executes the pipeline, processing all stages and returning the results. This method initiates one or more runs through the pipeline, handling the flow of data between stages. |
|
||||||
|
|
||||||
|
## Pipeline Output
|
||||||
|
|
||||||
|
!!! note "Understanding Pipeline Outputs"
|
||||||
|
The output of a pipeline in the crewAI framework is encapsulated within two main classes: `PipelineOutput` and `PipelineRunResult`. These classes provide a structured way to access the results of the pipeline's execution, including various formats such as raw strings, JSON, and Pydantic models.
|
||||||
|
|
||||||
|
### Pipeline Output Attributes
|
||||||
|
|
||||||
|
| Attribute | Parameters | Type | Description |
|
||||||
|
| :-------------- | :------------ | :------------------------ | :-------------------------------------------------------------------------------------------------------- |
|
||||||
|
| **ID** | `id` | `UUID4` | A unique identifier for the pipeline output. |
|
||||||
|
| **Run Results** | `run_results` | `List[PipelineRunResult]` | A list of `PipelineRunResult` objects, each representing the output of a single run through the pipeline. |
|
||||||
|
|
||||||
|
### Pipeline Output Methods
|
||||||
|
|
||||||
|
| Method/Property | Description |
|
||||||
|
| :----------------- | :----------------------------------------------------- |
|
||||||
|
| **add_run_result** | Adds a `PipelineRunResult` to the list of run results. |
|
||||||
|
|
||||||
|
### Pipeline Run Result Attributes
|
||||||
|
|
||||||
|
| Attribute | Parameters | Type | Description |
|
||||||
|
| :---------------- | :-------------- | :------------------------- | :-------------------------------------------------------------------------------------------- |
|
||||||
|
| **ID** | `id` | `UUID4` | A unique identifier for the run result. |
|
||||||
|
| **Raw** | `raw` | `str` | The raw output of the final stage in the pipeline run. |
|
||||||
|
| **Pydantic** | `pydantic` | `Optional[BaseModel]` | A Pydantic model object representing the structured output of the final stage, if applicable. |
|
||||||
|
| **JSON Dict** | `json_dict` | `Optional[Dict[str, Any]]` | A dictionary representing the JSON output of the final stage, if applicable. |
|
||||||
|
| **Token Usage** | `token_usage` | `Dict[str, Any]` | A summary of token usage across all stages of the pipeline run. |
|
||||||
|
| **Trace** | `trace` | `List[Any]` | A trace of the journey of inputs through the pipeline run. |
|
||||||
|
| **Crews Outputs** | `crews_outputs` | `List[CrewOutput]` | A list of `CrewOutput` objects, representing the outputs from each crew in the pipeline run. |
|
||||||
|
|
||||||
|
### Pipeline Run Result Methods and Properties
|
||||||
|
|
||||||
|
| Method/Property | Description |
|
||||||
|
| :-------------- | :------------------------------------------------------------------------------------------------------- |
|
||||||
|
| **json** | Returns the JSON string representation of the run result if the output format of the final task is JSON. |
|
||||||
|
| **to_dict** | Converts the JSON and Pydantic outputs to a dictionary. |
|
||||||
|
| \***\*str\*\*** | Returns the string representation of the run result, prioritizing Pydantic, then JSON, then raw. |
|
||||||
|
|
||||||
|
### Accessing Pipeline Outputs
|
||||||
|
|
||||||
|
Once a pipeline has been executed, its output can be accessed through the `PipelineOutput` object returned by the `process_runs` method. The `PipelineOutput` class provides access to individual `PipelineRunResult` objects, each representing a single run through the pipeline.
|
||||||
|
|
||||||
|
#### Example
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Define input data for the pipeline
|
||||||
|
input_data = [{"initial_query": "Latest advancements in AI"}, {"initial_query": "Future of robotics"}]
|
||||||
|
|
||||||
|
# Execute the pipeline
|
||||||
|
pipeline_output = await my_pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
# Access the results
|
||||||
|
for run_result in pipeline_output.run_results:
|
||||||
|
print(f"Run ID: {run_result.id}")
|
||||||
|
print(f"Final Raw Output: {run_result.raw}")
|
||||||
|
if run_result.json_dict:
|
||||||
|
print(f"JSON Output: {json.dumps(run_result.json_dict, indent=2)}")
|
||||||
|
if run_result.pydantic:
|
||||||
|
print(f"Pydantic Output: {run_result.pydantic}")
|
||||||
|
print(f"Token Usage: {run_result.token_usage}")
|
||||||
|
print(f"Trace: {run_result.trace}")
|
||||||
|
print("Crew Outputs:")
|
||||||
|
for crew_output in run_result.crews_outputs:
|
||||||
|
print(f" Crew: {crew_output.raw}")
|
||||||
|
print("\n")
|
||||||
|
```
|
||||||
|
|
||||||
|
This example demonstrates how to access and work with the pipeline output, including individual run results and their associated data.
|
||||||
|
|
||||||
|
## Using Pipelines
|
||||||
|
|
||||||
|
Pipelines are particularly useful for complex workflows that involve multiple stages of processing, analysis, or content generation. They allow you to:
|
||||||
|
|
||||||
|
1. **Sequence Operations**: Execute crews in a specific order, ensuring that the output of one crew is available as input to the next.
|
||||||
|
2. **Parallel Processing**: Run multiple crews concurrently within a stage for increased efficiency.
|
||||||
|
3. **Manage Complex Workflows**: Break down large tasks into smaller, manageable steps executed by specialized crews.
|
||||||
|
|
||||||
|
### Example: Running a Pipeline
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Define input data for the pipeline
|
||||||
|
input_data = [{"initial_query": "Latest advancements in AI"}]
|
||||||
|
|
||||||
|
# Execute the pipeline, initiating a run for each input
|
||||||
|
results = await my_pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
# Access the results
|
||||||
|
for result in results:
|
||||||
|
print(f"Final Output: {result.raw}")
|
||||||
|
print(f"Token Usage: {result.token_usage}")
|
||||||
|
print(f"Trace: {result.trace}") # Shows the path of the input through all stages
|
||||||
|
```
|
||||||
|
|
||||||
|
## Advanced Features
|
||||||
|
|
||||||
|
### Parallel Execution within Stages
|
||||||
|
|
||||||
|
You can define parallel execution within a stage by providing a list of crews, creating multiple branches:
|
||||||
|
|
||||||
|
```python
|
||||||
|
parallel_analysis_crew = Crew(agents=[financial_analyst], tasks=[financial_analysis_task])
|
||||||
|
market_analysis_crew = Crew(agents=[market_analyst], tasks=[market_analysis_task])
|
||||||
|
|
||||||
|
my_pipeline = Pipeline(
|
||||||
|
stages=[
|
||||||
|
research_crew,
|
||||||
|
[parallel_analysis_crew, market_analysis_crew], # Parallel execution (branching)
|
||||||
|
writing_crew
|
||||||
|
]
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Error Handling and Validation
|
||||||
|
|
||||||
|
The Pipeline class includes validation mechanisms to ensure the robustness of the pipeline structure:
|
||||||
|
|
||||||
|
- Validates that stages contain only Crew instances or lists of Crew instances.
|
||||||
|
- Prevents double nesting of stages to maintain a clear structure.
|
||||||
|
|
||||||
|
## Best Practices for Pipelines
|
||||||
|
|
||||||
|
1. **Clear Stage Definition**: Define each stage with a clear purpose and expected output.
|
||||||
|
2. **Effective Data Flow**: Ensure that the output of each stage is in a format suitable for the input of the next stage.
|
||||||
|
3. \*\*Parallel Proce
|
||||||
@@ -232,7 +232,7 @@ def callback_function(output: TaskOutput):
|
|||||||
print(f"""
|
print(f"""
|
||||||
Task completed!
|
Task completed!
|
||||||
Task: {output.description}
|
Task: {output.description}
|
||||||
Output: {output.raw_output}
|
Output: {output.raw}
|
||||||
""")
|
""")
|
||||||
|
|
||||||
research_task = Task(
|
research_task = Task(
|
||||||
|
|||||||
@@ -33,6 +33,11 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
|
|||||||
Crews
|
Crews
|
||||||
</a>
|
</a>
|
||||||
</li>
|
</li>
|
||||||
|
<li>
|
||||||
|
<a href="./core-concepts/Pipeline">
|
||||||
|
Pipeline
|
||||||
|
</a>
|
||||||
|
</li>
|
||||||
<li>
|
<li>
|
||||||
<a href="./core-concepts/Training-Crew">
|
<a href="./core-concepts/Training-Crew">
|
||||||
Training
|
Training
|
||||||
|
|||||||
67
poetry.lock
generated
67
poetry.lock
generated
@@ -2,13 +2,13 @@
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "agentops"
|
name = "agentops"
|
||||||
version = "0.3.0"
|
version = "0.3.2"
|
||||||
description = "Python SDK for developing AI agent evals and observability"
|
description = "Python SDK for developing AI agent evals and observability"
|
||||||
optional = true
|
optional = true
|
||||||
python-versions = ">=3.7"
|
python-versions = ">=3.7"
|
||||||
files = [
|
files = [
|
||||||
{file = "agentops-0.3.0-py3-none-any.whl", hash = "sha256:22aeb3355e66b32a2b2a9f676048b81979b2488feddb088f9266034b3ed50539"},
|
{file = "agentops-0.3.2-py3-none-any.whl", hash = "sha256:b35988e04378624204572bb3d7a454094f879ea573f05b57d4e75ab0bfbb82af"},
|
||||||
{file = "agentops-0.3.0.tar.gz", hash = "sha256:6c0c08a57410fa5e826a7bafa1deeba9f7b3524709427d9e1abbd0964caaf76b"},
|
{file = "agentops-0.3.2.tar.gz", hash = "sha256:55559ac4a43634831dfa8937c2597c28e332809dc7c6bb3bc3c8b233442e224c"},
|
||||||
]
|
]
|
||||||
|
|
||||||
[package.dependencies]
|
[package.dependencies]
|
||||||
@@ -3337,13 +3337,13 @@ sympy = "*"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "openai"
|
name = "openai"
|
||||||
version = "1.36.0"
|
version = "1.36.1"
|
||||||
description = "The official Python library for the openai API"
|
description = "The official Python library for the openai API"
|
||||||
optional = false
|
optional = false
|
||||||
python-versions = ">=3.7.1"
|
python-versions = ">=3.7.1"
|
||||||
files = [
|
files = [
|
||||||
{file = "openai-1.36.0-py3-none-any.whl", hash = "sha256:82b74ded1fe2ea94abb19a007178bc143675f1b6903cebd63e2968d654bb0a6f"},
|
{file = "openai-1.36.1-py3-none-any.whl", hash = "sha256:d399b9d476dbbc167aceaac6bc6ed0b2e2bb6c9e189c7f7047f822743ae62e64"},
|
||||||
{file = "openai-1.36.0.tar.gz", hash = "sha256:a124baf0e1657d6156e12248642f88489cd030be8655b69bc1c13eb50e71a93d"},
|
{file = "openai-1.36.1.tar.gz", hash = "sha256:41be9e0302e95dba8a9374b885c5cb1cec2202816a70b98736fee25a2cadd1f2"},
|
||||||
]
|
]
|
||||||
|
|
||||||
[package.dependencies]
|
[package.dependencies]
|
||||||
@@ -4085,6 +4085,19 @@ files = [
|
|||||||
{file = "pyarrow-17.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:392bc9feabc647338e6c89267635e111d71edad5fcffba204425a7c8d13610d7"},
|
{file = "pyarrow-17.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:392bc9feabc647338e6c89267635e111d71edad5fcffba204425a7c8d13610d7"},
|
||||||
{file = "pyarrow-17.0.0-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:af5ff82a04b2171415f1410cff7ebb79861afc5dae50be73ce06d6e870615204"},
|
{file = "pyarrow-17.0.0-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:af5ff82a04b2171415f1410cff7ebb79861afc5dae50be73ce06d6e870615204"},
|
||||||
{file = "pyarrow-17.0.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:edca18eaca89cd6382dfbcff3dd2d87633433043650c07375d095cd3517561d8"},
|
{file = "pyarrow-17.0.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:edca18eaca89cd6382dfbcff3dd2d87633433043650c07375d095cd3517561d8"},
|
||||||
|
{file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7c7916bff914ac5d4a8fe25b7a25e432ff921e72f6f2b7547d1e325c1ad9d155"},
|
||||||
|
{file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f553ca691b9e94b202ff741bdd40f6ccb70cdd5fbf65c187af132f1317de6145"},
|
||||||
|
{file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:0cdb0e627c86c373205a2f94a510ac4376fdc523f8bb36beab2e7f204416163c"},
|
||||||
|
{file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:d7d192305d9d8bc9082d10f361fc70a73590a4c65cf31c3e6926cd72b76bc35c"},
|
||||||
|
{file = "pyarrow-17.0.0-cp38-cp38-win_amd64.whl", hash = "sha256:02dae06ce212d8b3244dd3e7d12d9c4d3046945a5933d28026598e9dbbda1fca"},
|
||||||
|
{file = "pyarrow-17.0.0-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:13d7a460b412f31e4c0efa1148e1d29bdf18ad1411eb6757d38f8fbdcc8645fb"},
|
||||||
|
{file = "pyarrow-17.0.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:9b564a51fbccfab5a04a80453e5ac6c9954a9c5ef2890d1bcf63741909c3f8df"},
|
||||||
|
{file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:32503827abbc5aadedfa235f5ece8c4f8f8b0a3cf01066bc8d29de7539532687"},
|
||||||
|
{file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a155acc7f154b9ffcc85497509bcd0d43efb80d6f733b0dc3bb14e281f131c8b"},
|
||||||
|
{file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:dec8d129254d0188a49f8a1fc99e0560dc1b85f60af729f47de4046015f9b0a5"},
|
||||||
|
{file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:a48ddf5c3c6a6c505904545c25a4ae13646ae1f8ba703c4df4a1bfe4f4006bda"},
|
||||||
|
{file = "pyarrow-17.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:42bf93249a083aca230ba7e2786c5f673507fa97bbd9725a1e2754715151a204"},
|
||||||
|
{file = "pyarrow-17.0.0.tar.gz", hash = "sha256:4beca9521ed2c0921c1023e68d097d0299b62c362639ea315572a58f3f50fd28"},
|
||||||
]
|
]
|
||||||
|
|
||||||
[package.dependencies]
|
[package.dependencies]
|
||||||
@@ -4321,13 +4334,13 @@ extra = ["pygments (>=2.12)"]
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pypdf"
|
name = "pypdf"
|
||||||
version = "4.3.0"
|
version = "4.3.1"
|
||||||
description = "A pure-python PDF library capable of splitting, merging, cropping, and transforming PDF files"
|
description = "A pure-python PDF library capable of splitting, merging, cropping, and transforming PDF files"
|
||||||
optional = false
|
optional = false
|
||||||
python-versions = ">=3.6"
|
python-versions = ">=3.6"
|
||||||
files = [
|
files = [
|
||||||
{file = "pypdf-4.3.0-py3-none-any.whl", hash = "sha256:eeea4d019b57c099d02a0e1692eaaab23341ae3f255c1dafa3c8566b4636496d"},
|
{file = "pypdf-4.3.1-py3-none-any.whl", hash = "sha256:64b31da97eda0771ef22edb1bfecd5deee4b72c3d1736b7df2689805076d6418"},
|
||||||
{file = "pypdf-4.3.0.tar.gz", hash = "sha256:0d7a4c67fd03782f5a09d3f48c11c7a31e0bb9af78861a25229bb49259ed0504"},
|
{file = "pypdf-4.3.1.tar.gz", hash = "sha256:b2f37fe9a3030aa97ca86067a56ba3f9d3565f9a791b305c7355d8392c30d91b"},
|
||||||
]
|
]
|
||||||
|
|
||||||
[package.dependencies]
|
[package.dependencies]
|
||||||
@@ -4414,13 +4427,13 @@ files = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pytest"
|
name = "pytest"
|
||||||
version = "8.2.2"
|
version = "8.3.1"
|
||||||
description = "pytest: simple powerful testing with Python"
|
description = "pytest: simple powerful testing with Python"
|
||||||
optional = false
|
optional = false
|
||||||
python-versions = ">=3.8"
|
python-versions = ">=3.8"
|
||||||
files = [
|
files = [
|
||||||
{file = "pytest-8.2.2-py3-none-any.whl", hash = "sha256:c434598117762e2bd304e526244f67bf66bbd7b5d6cf22138be51ff661980343"},
|
{file = "pytest-8.3.1-py3-none-any.whl", hash = "sha256:e9600ccf4f563976e2c99fa02c7624ab938296551f280835ee6516df8bc4ae8c"},
|
||||||
{file = "pytest-8.2.2.tar.gz", hash = "sha256:de4bb8104e201939ccdc688b27a89a7be2079b22e2bd2b07f806b6ba71117977"},
|
{file = "pytest-8.3.1.tar.gz", hash = "sha256:7e8e5c5abd6e93cb1cc151f23e57adc31fcf8cfd2a3ff2da63e23f732de35db6"},
|
||||||
]
|
]
|
||||||
|
|
||||||
[package.dependencies]
|
[package.dependencies]
|
||||||
@@ -4428,12 +4441,30 @@ colorama = {version = "*", markers = "sys_platform == \"win32\""}
|
|||||||
exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""}
|
exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""}
|
||||||
iniconfig = "*"
|
iniconfig = "*"
|
||||||
packaging = "*"
|
packaging = "*"
|
||||||
pluggy = ">=1.5,<2.0"
|
pluggy = ">=1.5,<2"
|
||||||
tomli = {version = ">=1", markers = "python_version < \"3.11\""}
|
tomli = {version = ">=1", markers = "python_version < \"3.11\""}
|
||||||
|
|
||||||
[package.extras]
|
[package.extras]
|
||||||
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
|
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pytest-asyncio"
|
||||||
|
version = "0.23.8"
|
||||||
|
description = "Pytest support for asyncio"
|
||||||
|
optional = false
|
||||||
|
python-versions = ">=3.8"
|
||||||
|
files = [
|
||||||
|
{file = "pytest_asyncio-0.23.8-py3-none-any.whl", hash = "sha256:50265d892689a5faefb84df80819d1ecef566eb3549cf915dfb33569359d1ce2"},
|
||||||
|
{file = "pytest_asyncio-0.23.8.tar.gz", hash = "sha256:759b10b33a6dc61cce40a8bd5205e302978bbbcc00e279a8b61d9a6a3c82e4d3"},
|
||||||
|
]
|
||||||
|
|
||||||
|
[package.dependencies]
|
||||||
|
pytest = ">=7.0.0,<9"
|
||||||
|
|
||||||
|
[package.extras]
|
||||||
|
docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"]
|
||||||
|
testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pytest-vcr"
|
name = "pytest-vcr"
|
||||||
version = "1.0.2"
|
version = "1.0.2"
|
||||||
@@ -4899,19 +4930,19 @@ files = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "setuptools"
|
name = "setuptools"
|
||||||
version = "71.0.4"
|
version = "71.1.0"
|
||||||
description = "Easily download, build, install, upgrade, and uninstall Python packages"
|
description = "Easily download, build, install, upgrade, and uninstall Python packages"
|
||||||
optional = false
|
optional = false
|
||||||
python-versions = ">=3.8"
|
python-versions = ">=3.8"
|
||||||
files = [
|
files = [
|
||||||
{file = "setuptools-71.0.4-py3-none-any.whl", hash = "sha256:ed2feca703be3bdbd94e6bb17365d91c6935c6b2a8d0bb09b66a2c435ba0b1a5"},
|
{file = "setuptools-71.1.0-py3-none-any.whl", hash = "sha256:33874fdc59b3188304b2e7c80d9029097ea31627180896fb549c578ceb8a0855"},
|
||||||
{file = "setuptools-71.0.4.tar.gz", hash = "sha256:48297e5d393a62b7cb2a10b8f76c63a73af933bd809c9e0d0d6352a1a0135dd8"},
|
{file = "setuptools-71.1.0.tar.gz", hash = "sha256:032d42ee9fb536e33087fb66cac5f840eb9391ed05637b3f2a76a7c8fb477936"},
|
||||||
]
|
]
|
||||||
|
|
||||||
[package.extras]
|
[package.extras]
|
||||||
core = ["importlib-metadata (>=6)", "importlib-resources (>=5.10.2)", "jaraco.text (>=3.7)", "more-itertools (>=8.8)", "ordered-set (>=3.1.1)", "packaging (>=24)", "platformdirs (>=2.6.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"]
|
core = ["importlib-metadata (>=6)", "importlib-resources (>=5.10.2)", "jaraco.text (>=3.7)", "more-itertools (>=8.8)", "ordered-set (>=3.1.1)", "packaging (>=24)", "platformdirs (>=2.6.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"]
|
||||||
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
|
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
|
||||||
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "jaraco.test", "mypy (==1.10.0)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (<0.4)", "pytest-ruff (>=0.2.1)", "pytest-ruff (>=0.3.2)", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
|
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "jaraco.test", "mypy (==1.11.*)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (<0.4)", "pytest-ruff (>=0.2.1)", "pytest-ruff (>=0.3.2)", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "shapely"
|
name = "shapely"
|
||||||
@@ -6130,4 +6161,4 @@ tools = ["crewai-tools"]
|
|||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "2.0"
|
lock-version = "2.0"
|
||||||
python-versions = ">=3.10,<=3.13"
|
python-versions = ">=3.10,<=3.13"
|
||||||
content-hash = "f5ad9babb3c57c405e39232020e8cbfaaeb5c315c2e7c5bb8fdf66792f260343"
|
content-hash = "8df022f5ec0997c0a0f5710476139d9117c1057889c158e958f2c8efd22a4756"
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ crewai-tools = "^0.4.26"
|
|||||||
pytest = "^8.0.0"
|
pytest = "^8.0.0"
|
||||||
pytest-vcr = "^1.0.2"
|
pytest-vcr = "^1.0.2"
|
||||||
python-dotenv = "1.0.0"
|
python-dotenv = "1.0.0"
|
||||||
|
pytest-asyncio = "^0.23.7"
|
||||||
|
|
||||||
[tool.poetry.scripts]
|
[tool.poetry.scripts]
|
||||||
crewai = "crewai.cli.cli:crewai"
|
crewai = "crewai.cli.cli:crewai"
|
||||||
|
|||||||
@@ -199,7 +199,9 @@ class Agent(BaseAgent):
|
|||||||
"tools": self.agent_executor.tools_description,
|
"tools": self.agent_executor.tools_description,
|
||||||
}
|
}
|
||||||
)["output"]
|
)["output"]
|
||||||
|
print("Result when things went well:", result)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
print("FAILED TO EXECUTE TASK", e)
|
||||||
self._times_executed += 1
|
self._times_executed += 1
|
||||||
if self._times_executed > self.max_retry_limit:
|
if self._times_executed > self.max_retry_limit:
|
||||||
raise e
|
raise e
|
||||||
@@ -215,6 +217,7 @@ class Agent(BaseAgent):
|
|||||||
if tool_result.get("result_as_answer", False):
|
if tool_result.get("result_as_answer", False):
|
||||||
result = tool_result["result"]
|
result = tool_result["result"]
|
||||||
|
|
||||||
|
print("RESULT TO RETURN", result)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def format_log_to_str(
|
def format_log_to_str(
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
|
|||||||
)
|
)
|
||||||
intermediate_steps: List[Tuple[AgentAction, str]] = []
|
intermediate_steps: List[Tuple[AgentAction, str]] = []
|
||||||
# Allowing human input given task setting
|
# Allowing human input given task setting
|
||||||
if self.task.human_input:
|
if self.task and self.task.human_input:
|
||||||
self.should_ask_for_human_input = True
|
self.should_ask_for_human_input = True
|
||||||
|
|
||||||
# Let's start tracking the number of iterations and time elapsed
|
# Let's start tracking the number of iterations and time elapsed
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import json
|
|||||||
import uuid
|
import uuid
|
||||||
from concurrent.futures import Future
|
from concurrent.futures import Future
|
||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
|
||||||
|
|
||||||
from langchain_core.callbacks import BaseCallbackHandler
|
from langchain_core.callbacks import BaseCallbackHandler
|
||||||
from pydantic import (
|
from pydantic import (
|
||||||
@@ -33,10 +33,7 @@ from crewai.tasks.task_output import TaskOutput
|
|||||||
from crewai.telemetry import Telemetry
|
from crewai.telemetry import Telemetry
|
||||||
from crewai.tools.agent_tools import AgentTools
|
from crewai.tools.agent_tools import AgentTools
|
||||||
from crewai.utilities import I18N, FileHandler, Logger, RPMController
|
from crewai.utilities import I18N, FileHandler, Logger, RPMController
|
||||||
from crewai.utilities.constants import (
|
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
|
||||||
TRAINED_AGENTS_DATA_FILE,
|
|
||||||
TRAINING_DATA_FILE,
|
|
||||||
)
|
|
||||||
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
|
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
|
||||||
from crewai.utilities.formatter import (
|
from crewai.utilities.formatter import (
|
||||||
aggregate_raw_outputs_from_task_outputs,
|
aggregate_raw_outputs_from_task_outputs,
|
||||||
@@ -51,6 +48,9 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
agentops = None
|
agentops = None
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from crewai.pipeline.pipeline import Pipeline
|
||||||
|
|
||||||
|
|
||||||
class Crew(BaseModel):
|
class Crew(BaseModel):
|
||||||
"""
|
"""
|
||||||
@@ -96,12 +96,13 @@ class Crew(BaseModel):
|
|||||||
default_factory=TaskOutputStorageHandler
|
default_factory=TaskOutputStorageHandler
|
||||||
)
|
)
|
||||||
|
|
||||||
|
name: Optional[str] = Field(default="")
|
||||||
cache: bool = Field(default=True)
|
cache: bool = Field(default=True)
|
||||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||||
tasks: List[Task] = Field(default_factory=list)
|
tasks: List[Task] = Field(default_factory=list)
|
||||||
agents: List[BaseAgent] = Field(default_factory=list)
|
agents: List[BaseAgent] = Field(default_factory=list)
|
||||||
process: Process = Field(default=Process.sequential)
|
process: Process = Field(default=Process.sequential)
|
||||||
verbose: Union[int, bool] = Field(default=0)
|
verbose: int = Field(default=0)
|
||||||
memory: bool = Field(
|
memory: bool = Field(
|
||||||
default=False,
|
default=False,
|
||||||
description="Whether the crew should use memory to store memories of it's execution",
|
description="Whether the crew should use memory to store memories of it's execution",
|
||||||
@@ -146,8 +147,8 @@ class Crew(BaseModel):
|
|||||||
default=None,
|
default=None,
|
||||||
description="Path to the prompt json file to be used for the crew.",
|
description="Path to the prompt json file to be used for the crew.",
|
||||||
)
|
)
|
||||||
output_log_file: Optional[Union[bool, str]] = Field(
|
output_log_file: Optional[str] = Field(
|
||||||
default=False,
|
default="",
|
||||||
description="output_log_file",
|
description="output_log_file",
|
||||||
)
|
)
|
||||||
planning: Optional[bool] = Field(
|
planning: Optional[bool] = Field(
|
||||||
@@ -966,5 +967,17 @@ class Crew(BaseModel):
|
|||||||
|
|
||||||
return total_usage_metrics
|
return total_usage_metrics
|
||||||
|
|
||||||
|
def __rshift__(self, other: "Crew") -> "Pipeline":
|
||||||
|
"""
|
||||||
|
Implements the >> operator to add another Crew to an existing Pipeline.
|
||||||
|
"""
|
||||||
|
from crewai.pipeline.pipeline import Pipeline
|
||||||
|
|
||||||
|
if not isinstance(other, Crew):
|
||||||
|
raise TypeError(
|
||||||
|
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
|
||||||
|
)
|
||||||
|
return Pipeline(stages=[self, other])
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"
|
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"
|
||||||
|
|||||||
3
src/crewai/pipeline/__init__.py
Normal file
3
src/crewai/pipeline/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
from crewai.pipeline.pipeline import Pipeline
|
||||||
|
|
||||||
|
__all__ = ["Pipeline"]
|
||||||
218
src/crewai/pipeline/pipeline.py
Normal file
218
src/crewai/pipeline/pipeline.py
Normal file
@@ -0,0 +1,218 @@
|
|||||||
|
import asyncio
|
||||||
|
import copy
|
||||||
|
from typing import Any, Dict, List, Tuple, Union
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field, model_validator
|
||||||
|
|
||||||
|
from crewai.crew import Crew
|
||||||
|
from crewai.crews.crew_output import CrewOutput
|
||||||
|
from crewai.pipeline.pipeline_run_result import PipelineRunResult
|
||||||
|
|
||||||
|
Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]]
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
Pipeline Terminology:
|
||||||
|
Pipeline: The overall structure that defines a sequence of operations.
|
||||||
|
Stage: A distinct part of the pipeline, which can be either sequential or parallel.
|
||||||
|
Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
|
||||||
|
Branch: Parallel executions within a stage (e.g., concurrent crew operations).
|
||||||
|
Trace: The journey of an individual input through the entire pipeline.
|
||||||
|
|
||||||
|
Example pipeline structure:
|
||||||
|
crew1 >> crew2 >> crew3
|
||||||
|
|
||||||
|
This represents a pipeline with three sequential stages:
|
||||||
|
1. crew1 is the first stage, which processes the input and passes its output to crew2.
|
||||||
|
2. crew2 is the second stage, which takes the output from crew1 as its input, processes it, and passes its output to crew3.
|
||||||
|
3. crew3 is the final stage, which takes the output from crew2 as its input and produces the final output of the pipeline.
|
||||||
|
|
||||||
|
Each input creates its own run, flowing through all stages of the pipeline.
|
||||||
|
Multiple runs can be processed concurrently, each following the defined pipeline structure.
|
||||||
|
|
||||||
|
Another example pipeline structure:
|
||||||
|
crew1 >> [crew2, crew3] >> crew4
|
||||||
|
|
||||||
|
This represents a pipeline with three stages:
|
||||||
|
1. A sequential stage (crew1)
|
||||||
|
2. A parallel stage with two branches (crew2 and crew3 executing concurrently)
|
||||||
|
3. Another sequential stage (crew4)
|
||||||
|
|
||||||
|
Each input creates its own run, flowing through all stages of the pipeline.
|
||||||
|
Multiple runs can be processed concurrently, each following the defined pipeline structure.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class Pipeline(BaseModel):
|
||||||
|
stages: List[Union[Crew, List[Crew]]] = Field(
|
||||||
|
..., description="List of crews representing stages to be executed in sequence"
|
||||||
|
)
|
||||||
|
|
||||||
|
@model_validator(mode="before")
|
||||||
|
@classmethod
|
||||||
|
def validate_stages(cls, values):
|
||||||
|
stages = values.get("stages", [])
|
||||||
|
|
||||||
|
def check_nesting_and_type(item, depth=0):
|
||||||
|
if depth > 1:
|
||||||
|
raise ValueError("Double nesting is not allowed in pipeline stages")
|
||||||
|
if isinstance(item, list):
|
||||||
|
for sub_item in item:
|
||||||
|
check_nesting_and_type(sub_item, depth + 1)
|
||||||
|
elif not isinstance(item, Crew):
|
||||||
|
raise ValueError(
|
||||||
|
f"Expected Crew instance or list of Crews, got {type(item)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
for stage in stages:
|
||||||
|
check_nesting_and_type(stage)
|
||||||
|
return values
|
||||||
|
|
||||||
|
async def process_runs(
|
||||||
|
self, run_inputs: List[Dict[str, Any]]
|
||||||
|
) -> List[PipelineRunResult]:
|
||||||
|
"""
|
||||||
|
Process multiple runs in parallel, with each run going through all stages.
|
||||||
|
"""
|
||||||
|
pipeline_results = []
|
||||||
|
|
||||||
|
# Process all runs in parallel
|
||||||
|
all_run_results = await asyncio.gather(
|
||||||
|
*(self.process_single_run(input_data) for input_data in run_inputs)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Flatten the list of lists into a single list of results
|
||||||
|
pipeline_results.extend(
|
||||||
|
result for run_result in all_run_results for result in run_result
|
||||||
|
)
|
||||||
|
|
||||||
|
return pipeline_results
|
||||||
|
|
||||||
|
async def process_single_run(
|
||||||
|
self, run_input: Dict[str, Any]
|
||||||
|
) -> List[PipelineRunResult]:
|
||||||
|
initial_input = copy.deepcopy(run_input)
|
||||||
|
current_input = copy.deepcopy(run_input)
|
||||||
|
usage_metrics = {}
|
||||||
|
all_stage_outputs: List[List[CrewOutput]] = []
|
||||||
|
traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]]
|
||||||
|
|
||||||
|
for stage in self.stages:
|
||||||
|
stage_input = copy.deepcopy(current_input)
|
||||||
|
stage_outputs, stage_trace = await self._process_stage(stage, stage_input)
|
||||||
|
|
||||||
|
self._update_metrics_and_input(
|
||||||
|
usage_metrics, current_input, stage, stage_outputs
|
||||||
|
)
|
||||||
|
traces.append(stage_trace)
|
||||||
|
all_stage_outputs.append(stage_outputs)
|
||||||
|
|
||||||
|
return self._build_pipeline_run_results(
|
||||||
|
all_stage_outputs, traces, usage_metrics
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _process_stage(
|
||||||
|
self, stage: Union[Crew, List[Crew]], current_input: Dict[str, Any]
|
||||||
|
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
|
||||||
|
if isinstance(stage, Crew):
|
||||||
|
return await self._process_single_crew(stage, current_input)
|
||||||
|
else:
|
||||||
|
return await self._process_parallel_crews(stage, current_input)
|
||||||
|
|
||||||
|
async def _process_single_crew(
|
||||||
|
self, crew: Crew, current_input: Dict[str, Any]
|
||||||
|
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
|
||||||
|
output = await crew.kickoff_async(inputs=current_input)
|
||||||
|
return [output], [crew.name or str(crew.id)]
|
||||||
|
|
||||||
|
async def _process_parallel_crews(
|
||||||
|
self, crews: List[Crew], current_input: Dict[str, Any]
|
||||||
|
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
|
||||||
|
parallel_outputs = await asyncio.gather(
|
||||||
|
*[crew.kickoff_async(inputs=current_input) for crew in crews]
|
||||||
|
)
|
||||||
|
return parallel_outputs, [crew.name or str(crew.id) for crew in crews]
|
||||||
|
|
||||||
|
def _update_metrics_and_input(
|
||||||
|
self,
|
||||||
|
usage_metrics: Dict[str, Any],
|
||||||
|
current_input: Dict[str, Any],
|
||||||
|
stage: Union[Crew, List[Crew]],
|
||||||
|
outputs: List[CrewOutput],
|
||||||
|
) -> None:
|
||||||
|
for crew, output in zip([stage] if isinstance(stage, Crew) else stage, outputs):
|
||||||
|
usage_metrics[crew.name or str(crew.id)] = output.token_usage
|
||||||
|
current_input.update(output.to_dict())
|
||||||
|
|
||||||
|
def _build_pipeline_run_results(
|
||||||
|
self,
|
||||||
|
all_stage_outputs: List[List[CrewOutput]],
|
||||||
|
traces: List[List[Union[str, Dict[str, Any]]]],
|
||||||
|
token_usage: Dict[str, Any],
|
||||||
|
) -> List[PipelineRunResult]:
|
||||||
|
formatted_traces = self._format_traces(traces)
|
||||||
|
formatted_crew_outputs = self._format_crew_outputs(all_stage_outputs)
|
||||||
|
|
||||||
|
return [
|
||||||
|
PipelineRunResult(
|
||||||
|
token_usage=token_usage,
|
||||||
|
trace=formatted_trace,
|
||||||
|
raw=crews_outputs[-1].raw,
|
||||||
|
pydantic=crews_outputs[-1].pydantic,
|
||||||
|
json_dict=crews_outputs[-1].json_dict,
|
||||||
|
crews_outputs=crews_outputs,
|
||||||
|
)
|
||||||
|
for crews_outputs, formatted_trace in zip(
|
||||||
|
formatted_crew_outputs, formatted_traces
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
def _format_traces(
|
||||||
|
self, traces: List[List[Union[str, Dict[str, Any]]]]
|
||||||
|
) -> List[List[Trace]]:
|
||||||
|
formatted_traces: List[Trace] = []
|
||||||
|
for trace in traces[:-1]:
|
||||||
|
formatted_traces.append(trace[0] if len(trace) == 1 else trace)
|
||||||
|
|
||||||
|
traces_to_return: List[List[Trace]] = []
|
||||||
|
final_trace = traces[-1]
|
||||||
|
if len(final_trace) == 1:
|
||||||
|
formatted_traces.append(final_trace[0])
|
||||||
|
traces_to_return.append(formatted_traces)
|
||||||
|
else:
|
||||||
|
for trace in final_trace:
|
||||||
|
copied_traces = formatted_traces.copy()
|
||||||
|
copied_traces.append(trace)
|
||||||
|
traces_to_return.append(copied_traces)
|
||||||
|
|
||||||
|
return traces_to_return
|
||||||
|
|
||||||
|
def _format_crew_outputs(
|
||||||
|
self, all_stage_outputs: List[List[CrewOutput]]
|
||||||
|
) -> List[List[CrewOutput]]:
|
||||||
|
crew_outputs: List[CrewOutput] = [
|
||||||
|
output
|
||||||
|
for stage_outputs in all_stage_outputs[:-1]
|
||||||
|
for output in stage_outputs
|
||||||
|
]
|
||||||
|
return [crew_outputs + [output] for output in all_stage_outputs[-1]]
|
||||||
|
|
||||||
|
def __rshift__(self, other: Any) -> "Pipeline":
|
||||||
|
"""
|
||||||
|
Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline.
|
||||||
|
"""
|
||||||
|
if isinstance(other, Crew):
|
||||||
|
return type(self)(stages=self.stages + [other])
|
||||||
|
elif isinstance(other, list) and all(isinstance(crew, Crew) for crew in other):
|
||||||
|
return type(self)(stages=self.stages + [other])
|
||||||
|
else:
|
||||||
|
raise TypeError(
|
||||||
|
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Helper function to run the pipeline
|
||||||
|
async def run_pipeline(
|
||||||
|
pipeline: Pipeline, inputs: List[Dict[str, Any]]
|
||||||
|
) -> List[PipelineRunResult]:
|
||||||
|
return await pipeline.process_runs(inputs)
|
||||||
20
src/crewai/pipeline/pipeline_output.py
Normal file
20
src/crewai/pipeline/pipeline_output.py
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
import uuid
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from pydantic import UUID4, BaseModel, Field
|
||||||
|
|
||||||
|
from crewai.pipeline.pipeline_run_result import PipelineRunResult
|
||||||
|
|
||||||
|
|
||||||
|
class PipelineOutput(BaseModel):
|
||||||
|
id: UUID4 = Field(
|
||||||
|
default_factory=uuid.uuid4,
|
||||||
|
frozen=True,
|
||||||
|
description="Unique identifier for the object, not set by user.",
|
||||||
|
)
|
||||||
|
run_results: List[PipelineRunResult] = Field(
|
||||||
|
description="List of results for each run through the pipeline", default=[]
|
||||||
|
)
|
||||||
|
|
||||||
|
def add_run_result(self, result: PipelineRunResult):
|
||||||
|
self.run_results.append(result)
|
||||||
60
src/crewai/pipeline/pipeline_run_result.py
Normal file
60
src/crewai/pipeline/pipeline_run_result.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
import json
|
||||||
|
import uuid
|
||||||
|
from typing import Any, Dict, List, Optional, Union
|
||||||
|
|
||||||
|
from pydantic import UUID4, BaseModel, Field
|
||||||
|
|
||||||
|
from crewai.crews.crew_output import CrewOutput
|
||||||
|
|
||||||
|
|
||||||
|
class PipelineRunResult(BaseModel):
|
||||||
|
"""Class that represents the result of a pipeline run."""
|
||||||
|
|
||||||
|
id: UUID4 = Field(
|
||||||
|
default_factory=uuid.uuid4,
|
||||||
|
frozen=True,
|
||||||
|
description="Unique identifier for the object, not set by user.",
|
||||||
|
)
|
||||||
|
raw: str = Field(description="Raw output of the pipeline run", default="")
|
||||||
|
pydantic: Any = Field(
|
||||||
|
description="Pydantic output of the pipeline run", default=None
|
||||||
|
)
|
||||||
|
json_dict: Union[Dict[str, Any], None] = Field(
|
||||||
|
description="JSON dict output of the pipeline run", default={}
|
||||||
|
)
|
||||||
|
|
||||||
|
token_usage: Dict[str, Any] = Field(
|
||||||
|
description="Token usage for each crew in the run"
|
||||||
|
)
|
||||||
|
trace: List[Any] = Field(
|
||||||
|
description="Trace of the journey of inputs through the run"
|
||||||
|
)
|
||||||
|
crews_outputs: List[CrewOutput] = Field(
|
||||||
|
description="Output from each crew in the run",
|
||||||
|
default=[],
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def json(self) -> Optional[str]:
|
||||||
|
if self.crews_outputs[-1].tasks_output[-1].output_format != "json":
|
||||||
|
raise ValueError(
|
||||||
|
"No JSON output found in the final task of the final crew. Please make sure to set the output_json property in the final task in your crew."
|
||||||
|
)
|
||||||
|
|
||||||
|
return json.dumps(self.json_dict)
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
"""Convert json_output and pydantic_output to a dictionary."""
|
||||||
|
output_dict = {}
|
||||||
|
if self.json_dict:
|
||||||
|
output_dict.update(self.json_dict)
|
||||||
|
elif self.pydantic:
|
||||||
|
output_dict.update(self.pydantic.model_dump())
|
||||||
|
return output_dict
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
if self.pydantic:
|
||||||
|
return str(self.pydantic)
|
||||||
|
if self.json_dict:
|
||||||
|
return str(self.json_dict)
|
||||||
|
return self.raw
|
||||||
67
src/crewai/router/router.py
Normal file
67
src/crewai/router/router.py
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
from typing import Any, Callable, Dict, List, Tuple, Union
|
||||||
|
|
||||||
|
from crewai.crew import Crew
|
||||||
|
from crewai.pipeline.pipeline import Pipeline
|
||||||
|
|
||||||
|
|
||||||
|
class Router:
|
||||||
|
def __init__(self):
|
||||||
|
self.conditions: List[
|
||||||
|
Tuple[Callable[[Dict[str, Any]], bool], Union[Crew, Pipeline]]
|
||||||
|
] = []
|
||||||
|
self.default: Union[Crew, Pipeline, None] = None
|
||||||
|
|
||||||
|
def add_condition(
|
||||||
|
self,
|
||||||
|
condition: Callable[[Dict[str, Any]], bool],
|
||||||
|
next_stage: Union[Crew, Pipeline],
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Add a condition and its corresponding next stage to the router.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
condition: A function that takes the input dictionary and returns a boolean.
|
||||||
|
next_stage: The Crew or Pipeline to execute if the condition is met.
|
||||||
|
"""
|
||||||
|
self.conditions.append((condition, next_stage))
|
||||||
|
|
||||||
|
def set_default(self, default_stage: Union[Crew, Pipeline]):
|
||||||
|
"""Set the default stage to be executed if no conditions are met."""
|
||||||
|
self.default = default_stage
|
||||||
|
|
||||||
|
def route(self, input_dict: Dict[str, Any]) -> Union[Crew, Pipeline]:
|
||||||
|
"""
|
||||||
|
Evaluate the input against the conditions and return the appropriate next stage.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
input_dict: The input dictionary to be evaluated.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The next Crew or Pipeline to be executed.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If no conditions are met and no default is set.
|
||||||
|
"""
|
||||||
|
for condition, next_stage in self.conditions:
|
||||||
|
if condition(input_dict):
|
||||||
|
self._update_trace(input_dict, next_stage)
|
||||||
|
return next_stage
|
||||||
|
|
||||||
|
if self.default is not None:
|
||||||
|
self._update_trace(input_dict, self.default)
|
||||||
|
return self.default
|
||||||
|
|
||||||
|
raise ValueError("No conditions were met and no default stage was set.")
|
||||||
|
|
||||||
|
def _update_trace(
|
||||||
|
self, input_dict: Dict[str, Any], next_stage: Union[Crew, Pipeline]
|
||||||
|
):
|
||||||
|
"""Update the trace to show that the input went through the router."""
|
||||||
|
if "trace" not in input_dict:
|
||||||
|
input_dict["trace"] = []
|
||||||
|
input_dict["trace"].append(
|
||||||
|
{
|
||||||
|
"router": self.__class__.__name__,
|
||||||
|
"next_stage": next_stage.__class__.__name__,
|
||||||
|
}
|
||||||
|
)
|
||||||
@@ -383,7 +383,7 @@ class Telemetry:
|
|||||||
{
|
{
|
||||||
"id": str(task.id),
|
"id": str(task.id),
|
||||||
"description": task.description,
|
"description": task.description,
|
||||||
"output": task.output.raw_output,
|
"output": task.output.raw,
|
||||||
}
|
}
|
||||||
for task in crew.tasks
|
for task in crew.tasks
|
||||||
]
|
]
|
||||||
|
|||||||
474
tests/pipeline/test_pipeline.py
Normal file
474
tests/pipeline/test_pipeline.py
Normal file
@@ -0,0 +1,474 @@
|
|||||||
|
import json
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from crewai.agent import Agent
|
||||||
|
from crewai.crew import Crew
|
||||||
|
from crewai.crews.crew_output import CrewOutput
|
||||||
|
from crewai.pipeline.pipeline import Pipeline
|
||||||
|
from crewai.pipeline.pipeline_run_result import PipelineRunResult
|
||||||
|
from crewai.process import Process
|
||||||
|
from crewai.task import Task
|
||||||
|
from crewai.tasks.task_output import TaskOutput
|
||||||
|
from pydantic import BaseModel, ValidationError
|
||||||
|
|
||||||
|
DEFAULT_TOKEN_USAGE = {
|
||||||
|
"total_tokens": 100,
|
||||||
|
"prompt_tokens": 50,
|
||||||
|
"completion_tokens": 50,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_crew_factory():
|
||||||
|
def _create_mock_crew(name: str, output_json_dict=None, pydantic_output=None):
|
||||||
|
crew = MagicMock(spec=Crew)
|
||||||
|
task_output = TaskOutput(
|
||||||
|
description="Test task", raw="Task output", agent="Test Agent"
|
||||||
|
)
|
||||||
|
crew_output = CrewOutput(
|
||||||
|
raw="Test output",
|
||||||
|
tasks_output=[task_output],
|
||||||
|
token_usage=DEFAULT_TOKEN_USAGE,
|
||||||
|
json_dict=output_json_dict if output_json_dict else None,
|
||||||
|
pydantic=pydantic_output,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def async_kickoff(inputs=None):
|
||||||
|
print("inputs in async_kickoff", inputs)
|
||||||
|
return crew_output
|
||||||
|
|
||||||
|
crew.kickoff_async.side_effect = async_kickoff
|
||||||
|
|
||||||
|
# Add more attributes that Procedure might be expecting
|
||||||
|
crew.verbose = False
|
||||||
|
crew.output_log_file = None
|
||||||
|
crew.max_rpm = None
|
||||||
|
crew.memory = False
|
||||||
|
crew.process = Process.sequential
|
||||||
|
crew.config = None
|
||||||
|
crew.cache = True
|
||||||
|
crew.name = name
|
||||||
|
|
||||||
|
# Add non-empty agents and tasks
|
||||||
|
mock_agent = MagicMock(spec=Agent)
|
||||||
|
mock_task = MagicMock(spec=Task)
|
||||||
|
mock_task.agent = mock_agent
|
||||||
|
mock_task.async_execution = False
|
||||||
|
mock_task.context = None
|
||||||
|
|
||||||
|
crew.agents = [mock_agent]
|
||||||
|
crew.tasks = [mock_task]
|
||||||
|
|
||||||
|
return crew
|
||||||
|
|
||||||
|
return _create_mock_crew
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipeline_initialization(mock_crew_factory):
|
||||||
|
"""
|
||||||
|
Test that a Pipeline is correctly initialized with the given stages.
|
||||||
|
"""
|
||||||
|
crew1 = mock_crew_factory(name="Crew 1")
|
||||||
|
crew2 = mock_crew_factory(name="Crew 2")
|
||||||
|
|
||||||
|
pipeline = Pipeline(stages=[crew1, crew2])
|
||||||
|
assert len(pipeline.stages) == 2
|
||||||
|
assert pipeline.stages[0] == crew1
|
||||||
|
assert pipeline.stages[1] == crew2
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pipeline_with_empty_input(mock_crew_factory):
|
||||||
|
"""
|
||||||
|
Ensure the pipeline handles an empty input list correctly.
|
||||||
|
"""
|
||||||
|
crew = mock_crew_factory(name="Test Crew")
|
||||||
|
pipeline = Pipeline(stages=[crew])
|
||||||
|
|
||||||
|
input_data = []
|
||||||
|
pipeline_results = await pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
len(pipeline_results) == 0
|
||||||
|
), "Pipeline should return empty results for empty input"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pipeline_process_streams_single_input(mock_crew_factory):
|
||||||
|
"""
|
||||||
|
Test that Pipeline.process_streams() correctly processes a single input
|
||||||
|
and returns the expected CrewOutput.
|
||||||
|
"""
|
||||||
|
crew_name = "Test Crew"
|
||||||
|
mock_crew = mock_crew_factory(name="Test Crew")
|
||||||
|
pipeline = Pipeline(stages=[mock_crew])
|
||||||
|
input_data = [{"key": "value"}]
|
||||||
|
pipeline_results = await pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"})
|
||||||
|
|
||||||
|
for pipeline_result in pipeline_results:
|
||||||
|
assert isinstance(pipeline_result, PipelineRunResult)
|
||||||
|
assert pipeline_result.raw == "Test output"
|
||||||
|
assert len(pipeline_result.crews_outputs) == 1
|
||||||
|
print("pipeline_result.token_usage", pipeline_result.token_usage)
|
||||||
|
assert pipeline_result.token_usage == {crew_name: DEFAULT_TOKEN_USAGE}
|
||||||
|
assert pipeline_result.trace == [input_data[0], "Test Crew"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pipeline_result_ordering(mock_crew_factory):
|
||||||
|
"""
|
||||||
|
Ensure that results are returned in the same order as the inputs, especially with parallel processing.
|
||||||
|
"""
|
||||||
|
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"output": "crew1"})
|
||||||
|
crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"output": "crew2"})
|
||||||
|
crew3 = mock_crew_factory(name="Crew 3", output_json_dict={"output": "crew3"})
|
||||||
|
|
||||||
|
pipeline = Pipeline(
|
||||||
|
stages=[crew1, [crew2, crew3]]
|
||||||
|
) # Parallel stage to test ordering
|
||||||
|
|
||||||
|
input_data = [{"id": 1}, {"id": 2}, {"id": 3}]
|
||||||
|
pipeline_results = await pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
len(pipeline_results) == 6
|
||||||
|
), "Should have 2 results for each input due to the parallel final stage"
|
||||||
|
|
||||||
|
# Group results by their original input id
|
||||||
|
grouped_results = {}
|
||||||
|
for result in pipeline_results:
|
||||||
|
input_id = result.trace[0]["id"]
|
||||||
|
if input_id not in grouped_results:
|
||||||
|
grouped_results[input_id] = []
|
||||||
|
grouped_results[input_id].append(result)
|
||||||
|
|
||||||
|
# Check that we have the correct number of groups and results per group
|
||||||
|
assert len(grouped_results) == 3, "Should have results for each of the 3 inputs"
|
||||||
|
for input_id, results in grouped_results.items():
|
||||||
|
assert (
|
||||||
|
len(results) == 2
|
||||||
|
), f"Each input should have 2 results, but input {input_id} has {len(results)}"
|
||||||
|
|
||||||
|
# Check the ordering and content of the results
|
||||||
|
for input_id in range(1, 4):
|
||||||
|
group = grouped_results[input_id]
|
||||||
|
assert group[0].trace == [
|
||||||
|
{"id": input_id},
|
||||||
|
"Crew 1",
|
||||||
|
"Crew 2",
|
||||||
|
], f"Unexpected trace for first result of input {input_id}"
|
||||||
|
assert group[1].trace == [
|
||||||
|
{"id": input_id},
|
||||||
|
"Crew 1",
|
||||||
|
"Crew 3",
|
||||||
|
], f"Unexpected trace for second result of input {input_id}"
|
||||||
|
assert (
|
||||||
|
group[0].json_dict["output"] == "crew2"
|
||||||
|
), f"Unexpected output for first result of input {input_id}"
|
||||||
|
assert (
|
||||||
|
group[1].json_dict["output"] == "crew3"
|
||||||
|
), f"Unexpected output for second result of input {input_id}"
|
||||||
|
|
||||||
|
|
||||||
|
class TestPydanticOutput(BaseModel):
|
||||||
|
key: str
|
||||||
|
value: int
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pipeline_process_streams_single_input_pydantic_output(mock_crew_factory):
|
||||||
|
crew_name = "Test Crew"
|
||||||
|
mock_crew = mock_crew_factory(
|
||||||
|
name=crew_name,
|
||||||
|
output_json_dict=None,
|
||||||
|
pydantic_output=TestPydanticOutput(key="test", value=42),
|
||||||
|
)
|
||||||
|
pipeline = Pipeline(stages=[mock_crew])
|
||||||
|
input_data = [{"key": "value"}]
|
||||||
|
pipeline_results = await pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
assert len(pipeline_results) == 1
|
||||||
|
pipeline_result = pipeline_results[0]
|
||||||
|
|
||||||
|
print("pipeline_result.trace", pipeline_result.trace)
|
||||||
|
|
||||||
|
assert isinstance(pipeline_result, PipelineRunResult)
|
||||||
|
assert pipeline_result.raw == "Test output"
|
||||||
|
assert len(pipeline_result.crews_outputs) == 1
|
||||||
|
assert pipeline_result.token_usage == {crew_name: DEFAULT_TOKEN_USAGE}
|
||||||
|
print("INPUT DATA POST PROCESS", input_data)
|
||||||
|
assert pipeline_result.trace == [input_data[0], "Test Crew"]
|
||||||
|
|
||||||
|
assert isinstance(pipeline_result.pydantic, TestPydanticOutput)
|
||||||
|
assert pipeline_result.pydantic.key == "test"
|
||||||
|
assert pipeline_result.pydantic.value == 42
|
||||||
|
assert pipeline_result.json_dict is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pipeline_preserves_original_input(mock_crew_factory):
|
||||||
|
crew_name = "Test Crew"
|
||||||
|
mock_crew = mock_crew_factory(
|
||||||
|
name=crew_name,
|
||||||
|
output_json_dict={"new_key": "new_value"},
|
||||||
|
)
|
||||||
|
pipeline = Pipeline(stages=[mock_crew])
|
||||||
|
|
||||||
|
# Create a deep copy of the input data to ensure we're not comparing references
|
||||||
|
original_input_data = [{"key": "value", "nested": {"a": 1}}]
|
||||||
|
input_data = json.loads(json.dumps(original_input_data))
|
||||||
|
|
||||||
|
await pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
# Assert that the original input hasn't been modified
|
||||||
|
assert (
|
||||||
|
input_data == original_input_data
|
||||||
|
), "The original input data should not be modified"
|
||||||
|
|
||||||
|
# Ensure that even nested structures haven't been modified
|
||||||
|
assert (
|
||||||
|
input_data[0]["nested"] == original_input_data[0]["nested"]
|
||||||
|
), "Nested structures should not be modified"
|
||||||
|
|
||||||
|
# Verify that adding new keys to the crew output doesn't affect the original input
|
||||||
|
assert (
|
||||||
|
"new_key" not in input_data[0]
|
||||||
|
), "New keys from crew output should not be added to the original input"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pipeline_process_streams_multiple_inputs(mock_crew_factory):
|
||||||
|
"""
|
||||||
|
Test that Pipeline.process_streams() correctly processes multiple inputs
|
||||||
|
and returns the expected CrewOutputs.
|
||||||
|
"""
|
||||||
|
mock_crew = mock_crew_factory(name="Test Crew")
|
||||||
|
pipeline = Pipeline(stages=[mock_crew])
|
||||||
|
input_data = [{"key1": "value1"}, {"key2": "value2"}]
|
||||||
|
pipeline_results = await pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
assert mock_crew.kickoff_async.call_count == 2
|
||||||
|
assert len(pipeline_results) == 2
|
||||||
|
for pipeline_result in pipeline_results:
|
||||||
|
print("pipeline_result,", pipeline_result)
|
||||||
|
assert all(
|
||||||
|
isinstance(crew_output, CrewOutput)
|
||||||
|
for crew_output in pipeline_result.crews_outputs
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pipeline_with_parallel_stages(mock_crew_factory):
|
||||||
|
"""
|
||||||
|
Test that Pipeline correctly handles parallel stages.
|
||||||
|
"""
|
||||||
|
crew1 = mock_crew_factory(name="Crew 1")
|
||||||
|
crew2 = mock_crew_factory(name="Crew 2")
|
||||||
|
crew3 = mock_crew_factory(name="Crew 3")
|
||||||
|
|
||||||
|
pipeline = Pipeline(stages=[crew1, [crew2, crew3]])
|
||||||
|
input_data = [{"initial": "data"}]
|
||||||
|
|
||||||
|
pipeline_result = await pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
||||||
|
|
||||||
|
assert len(pipeline_result) == 2
|
||||||
|
pipeline_result_1, pipeline_result_2 = pipeline_result
|
||||||
|
|
||||||
|
pipeline_result_1.trace = [
|
||||||
|
"Crew 1",
|
||||||
|
"Crew 2",
|
||||||
|
]
|
||||||
|
pipeline_result_2.trace = [
|
||||||
|
"Crew 1",
|
||||||
|
"Crew 3",
|
||||||
|
]
|
||||||
|
|
||||||
|
expected_token_usage = {
|
||||||
|
"Crew 1": DEFAULT_TOKEN_USAGE,
|
||||||
|
"Crew 2": DEFAULT_TOKEN_USAGE,
|
||||||
|
"Crew 3": DEFAULT_TOKEN_USAGE,
|
||||||
|
}
|
||||||
|
|
||||||
|
assert pipeline_result_1.token_usage == expected_token_usage
|
||||||
|
assert pipeline_result_2.token_usage == expected_token_usage
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pipeline_with_parallel_stages_end_in_single_stage(mock_crew_factory):
|
||||||
|
"""
|
||||||
|
Test that Pipeline correctly handles parallel stages.
|
||||||
|
"""
|
||||||
|
crew1 = mock_crew_factory(name="Crew 1")
|
||||||
|
crew2 = mock_crew_factory(name="Crew 2")
|
||||||
|
crew3 = mock_crew_factory(name="Crew 3")
|
||||||
|
crew4 = mock_crew_factory(name="Crew 4")
|
||||||
|
|
||||||
|
pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4])
|
||||||
|
input_data = [{"initial": "data"}]
|
||||||
|
|
||||||
|
pipeline_result = await pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
||||||
|
|
||||||
|
assert len(pipeline_result) == 1
|
||||||
|
pipeline_result_1 = pipeline_result[0]
|
||||||
|
|
||||||
|
pipeline_result_1.trace = [
|
||||||
|
input_data[0],
|
||||||
|
"Crew 1",
|
||||||
|
["Crew 2", "Crew 3"],
|
||||||
|
"Crew 4",
|
||||||
|
]
|
||||||
|
|
||||||
|
expected_token_usage = {
|
||||||
|
"Crew 1": DEFAULT_TOKEN_USAGE,
|
||||||
|
"Crew 2": DEFAULT_TOKEN_USAGE,
|
||||||
|
"Crew 3": DEFAULT_TOKEN_USAGE,
|
||||||
|
"Crew 4": DEFAULT_TOKEN_USAGE,
|
||||||
|
}
|
||||||
|
|
||||||
|
assert pipeline_result_1.token_usage == expected_token_usage
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pipeline_with_parallel_stages_multiple_inputs(mock_crew_factory):
|
||||||
|
# TODO: implement
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipeline_rshift_operator(mock_crew_factory):
|
||||||
|
"""
|
||||||
|
Test that the >> operator correctly creates a Pipeline from Crews and lists of Crews.
|
||||||
|
"""
|
||||||
|
crew1 = mock_crew_factory(name="Crew 1")
|
||||||
|
crew2 = mock_crew_factory(name="Crew 2")
|
||||||
|
crew3 = mock_crew_factory(name="Crew 3")
|
||||||
|
|
||||||
|
# Test single crew addition
|
||||||
|
pipeline = Pipeline(stages=[]) >> crew1
|
||||||
|
assert len(pipeline.stages) == 1
|
||||||
|
assert pipeline.stages[0] == crew1
|
||||||
|
|
||||||
|
# Test adding a list of crews
|
||||||
|
pipeline = Pipeline(stages=[crew1])
|
||||||
|
pipeline = pipeline >> [crew2, crew3]
|
||||||
|
print("pipeline.stages:", pipeline.stages)
|
||||||
|
assert len(pipeline.stages) == 2
|
||||||
|
assert pipeline.stages[1] == [crew2, crew3]
|
||||||
|
|
||||||
|
# Test error case: trying to shift with non-Crew object
|
||||||
|
with pytest.raises(TypeError):
|
||||||
|
pipeline >> "not a crew"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pipeline_parallel_crews_to_parallel_crews(mock_crew_factory):
|
||||||
|
"""
|
||||||
|
Test that feeding parallel crews to parallel crews works correctly.
|
||||||
|
"""
|
||||||
|
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"output1": "crew1"})
|
||||||
|
crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"output2": "crew2"})
|
||||||
|
crew3 = mock_crew_factory(name="Crew 3", output_json_dict={"output3": "crew3"})
|
||||||
|
crew4 = mock_crew_factory(name="Crew 4", output_json_dict={"output4": "crew4"})
|
||||||
|
|
||||||
|
pipeline = Pipeline(stages=[[crew1, crew2], [crew3, crew4]])
|
||||||
|
|
||||||
|
input_data = [{"input": "test"}]
|
||||||
|
pipeline_results = await pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
assert len(pipeline_results) == 2, "Should have 2 results for final parallel stage"
|
||||||
|
|
||||||
|
pipeline_result_1, pipeline_result_2 = pipeline_results
|
||||||
|
|
||||||
|
# Check the outputs
|
||||||
|
assert pipeline_result_1.json_dict == {"output3": "crew3"}
|
||||||
|
assert pipeline_result_2.json_dict == {"output4": "crew4"}
|
||||||
|
|
||||||
|
# Check the traces
|
||||||
|
expected_traces = [
|
||||||
|
[{"input": "test"}, ["Crew 1", "Crew 2"], "Crew 3"],
|
||||||
|
[{"input": "test"}, ["Crew 1", "Crew 2"], "Crew 4"],
|
||||||
|
]
|
||||||
|
|
||||||
|
for result, expected_trace in zip(pipeline_results, expected_traces):
|
||||||
|
assert result.trace == expected_trace, f"Unexpected trace: {result.trace}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipeline_double_nesting_not_allowed(mock_crew_factory):
|
||||||
|
"""
|
||||||
|
Test that double nesting in pipeline stages is not allowed.
|
||||||
|
"""
|
||||||
|
crew1 = mock_crew_factory(name="Crew 1")
|
||||||
|
crew2 = mock_crew_factory(name="Crew 2")
|
||||||
|
crew3 = mock_crew_factory(name="Crew 3")
|
||||||
|
crew4 = mock_crew_factory(name="Crew 4")
|
||||||
|
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
Pipeline(stages=[crew1, [[crew2, crew3], crew4]])
|
||||||
|
|
||||||
|
error_msg = str(exc_info.value)
|
||||||
|
print(f"Full error message: {error_msg}") # For debugging
|
||||||
|
assert (
|
||||||
|
"Double nesting is not allowed in pipeline stages" in error_msg
|
||||||
|
), f"Unexpected error message: {error_msg}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipeline_invalid_crew(mock_crew_factory):
|
||||||
|
"""
|
||||||
|
Test that non-Crew objects are not allowed in pipeline stages.
|
||||||
|
"""
|
||||||
|
crew1 = mock_crew_factory(name="Crew 1")
|
||||||
|
not_a_crew = "This is not a crew"
|
||||||
|
|
||||||
|
with pytest.raises(ValidationError) as exc_info:
|
||||||
|
Pipeline(stages=[crew1, not_a_crew])
|
||||||
|
|
||||||
|
error_msg = str(exc_info.value)
|
||||||
|
print(f"Full error message: {error_msg}") # For debugging
|
||||||
|
assert (
|
||||||
|
"Expected Crew instance or list of Crews, got <class 'str'>" in error_msg
|
||||||
|
), f"Unexpected error message: {error_msg}"
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
TODO: Figure out what is the proper output for a pipeline with multiple stages
|
||||||
|
|
||||||
|
Options:
|
||||||
|
- Should the final output only include the last stage's output?
|
||||||
|
- Should the final output include the accumulation of previous stages' outputs?
|
||||||
|
"""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pipeline_data_accumulation(mock_crew_factory):
|
||||||
|
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"})
|
||||||
|
crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"key2": "value2"})
|
||||||
|
|
||||||
|
pipeline = Pipeline(stages=[crew1, crew2])
|
||||||
|
input_data = [{"initial": "data"}]
|
||||||
|
results = await pipeline.process_runs(input_data)
|
||||||
|
|
||||||
|
# Check that crew1 was called with only the initial input
|
||||||
|
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
||||||
|
|
||||||
|
# Check that crew2 was called with the combined input from the initial data and crew1's output
|
||||||
|
crew2.kickoff_async.assert_called_once_with(
|
||||||
|
inputs={"initial": "data", "key1": "value1"}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check the final output
|
||||||
|
assert len(results) == 1
|
||||||
|
final_result = results[0]
|
||||||
|
assert final_result.json_dict == {"key2": "value2"}
|
||||||
|
|
||||||
|
# Check that the trace includes all stages
|
||||||
|
assert final_result.trace == [{"initial": "data"}, "Crew 1", "Crew 2"]
|
||||||
|
|
||||||
|
# Check that crews_outputs contain the correct information
|
||||||
|
assert len(final_result.crews_outputs) == 2
|
||||||
|
assert final_result.crews_outputs[0].json_dict == {"key1": "value1"}
|
||||||
|
assert final_result.crews_outputs[1].json_dict == {"key2": "value2"}
|
||||||
Reference in New Issue
Block a user