mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-24 08:18:31 +00:00
Compare commits
12 Commits
brandon/cr
...
feat/testi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
616ffe2aba | ||
|
|
a6bce1089a | ||
|
|
cb8fbf61de | ||
|
|
4d2cdc3d96 | ||
|
|
890c03a0a6 | ||
|
|
e4b419d5be | ||
|
|
8ffc4f79fa | ||
|
|
c05ef3c8cf | ||
|
|
cf600c1a43 | ||
|
|
2a88d1d462 | ||
|
|
660a2ae837 | ||
|
|
6930656897 |
@@ -1,202 +0,0 @@
|
||||
---
|
||||
title: crewAI Pipelines
|
||||
description: Understanding and utilizing pipelines in the crewAI framework for efficient multi-stage task processing.
|
||||
---
|
||||
|
||||
## What is a Pipeline?
|
||||
|
||||
A pipeline in crewAI represents a structured workflow that allows for the sequential or parallel execution of multiple crews. It provides a way to organize complex processes involving multiple stages, where the output of one stage can serve as input for subsequent stages.
|
||||
|
||||
## Key Terminology
|
||||
|
||||
Understanding the following terms is crucial for working effectively with pipelines:
|
||||
|
||||
- **Stage**: A distinct part of the pipeline, which can be either sequential (a single crew) or parallel (multiple crews executing concurrently).
|
||||
- **Run**: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
|
||||
- **Branch**: Parallel executions within a stage (e.g., concurrent crew operations).
|
||||
- **Trace**: The journey of an individual input through the entire pipeline, capturing the path and transformations it undergoes.
|
||||
|
||||
Example pipeline structure:
|
||||
|
||||
```
|
||||
crew1 >> [crew2, crew3] >> crew4
|
||||
```
|
||||
|
||||
This represents a pipeline with three stages:
|
||||
|
||||
1. A sequential stage (crew1)
|
||||
2. A parallel stage with two branches (crew2 and crew3 executing concurrently)
|
||||
3. Another sequential stage (crew4)
|
||||
|
||||
Each input creates its own run, flowing through all stages of the pipeline. Multiple runs can be processed concurrently, each following the defined pipeline structure.
|
||||
|
||||
## Pipeline Attributes
|
||||
|
||||
| Attribute | Parameters | Description |
|
||||
| :--------- | :--------- | :------------------------------------------------------------------------------------ |
|
||||
| **Stages** | `stages` | A list of crews or lists of crews representing the stages to be executed in sequence. |
|
||||
|
||||
## Creating a Pipeline
|
||||
|
||||
When creating a pipeline, you define a series of stages, each consisting of either a single crew or a list of crews for parallel execution. The pipeline ensures that each stage is executed in order, with the output of one stage feeding into the next.
|
||||
|
||||
### Example: Assembling a Pipeline
|
||||
|
||||
```python
|
||||
from crewai import Crew, Agent, Task, Pipeline
|
||||
|
||||
# Define your crews
|
||||
research_crew = Crew(
|
||||
agents=[researcher],
|
||||
tasks=[research_task],
|
||||
process=Process.sequential
|
||||
)
|
||||
|
||||
analysis_crew = Crew(
|
||||
agents=[analyst],
|
||||
tasks=[analysis_task],
|
||||
process=Process.sequential
|
||||
)
|
||||
|
||||
writing_crew = Crew(
|
||||
agents=[writer],
|
||||
tasks=[writing_task],
|
||||
process=Process.sequential
|
||||
)
|
||||
|
||||
# Assemble the pipeline
|
||||
my_pipeline = Pipeline(
|
||||
stages=[research_crew, analysis_crew, writing_crew]
|
||||
)
|
||||
```
|
||||
|
||||
## Pipeline Methods
|
||||
|
||||
| Method | Description |
|
||||
| :--------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| **process_runs** | Executes the pipeline, processing all stages and returning the results. This method initiates one or more runs through the pipeline, handling the flow of data between stages. |
|
||||
|
||||
## Pipeline Output
|
||||
|
||||
!!! note "Understanding Pipeline Outputs"
|
||||
The output of a pipeline in the crewAI framework is encapsulated within two main classes: `PipelineOutput` and `PipelineRunResult`. These classes provide a structured way to access the results of the pipeline's execution, including various formats such as raw strings, JSON, and Pydantic models.
|
||||
|
||||
### Pipeline Output Attributes
|
||||
|
||||
| Attribute | Parameters | Type | Description |
|
||||
| :-------------- | :------------ | :------------------------ | :-------------------------------------------------------------------------------------------------------- |
|
||||
| **ID** | `id` | `UUID4` | A unique identifier for the pipeline output. |
|
||||
| **Run Results** | `run_results` | `List[PipelineRunResult]` | A list of `PipelineRunResult` objects, each representing the output of a single run through the pipeline. |
|
||||
|
||||
### Pipeline Output Methods
|
||||
|
||||
| Method/Property | Description |
|
||||
| :----------------- | :----------------------------------------------------- |
|
||||
| **add_run_result** | Adds a `PipelineRunResult` to the list of run results. |
|
||||
|
||||
### Pipeline Run Result Attributes
|
||||
|
||||
| Attribute | Parameters | Type | Description |
|
||||
| :---------------- | :-------------- | :------------------------- | :-------------------------------------------------------------------------------------------- |
|
||||
| **ID** | `id` | `UUID4` | A unique identifier for the run result. |
|
||||
| **Raw** | `raw` | `str` | The raw output of the final stage in the pipeline run. |
|
||||
| **Pydantic** | `pydantic` | `Optional[BaseModel]` | A Pydantic model object representing the structured output of the final stage, if applicable. |
|
||||
| **JSON Dict** | `json_dict` | `Optional[Dict[str, Any]]` | A dictionary representing the JSON output of the final stage, if applicable. |
|
||||
| **Token Usage** | `token_usage` | `Dict[str, Any]` | A summary of token usage across all stages of the pipeline run. |
|
||||
| **Trace** | `trace` | `List[Any]` | A trace of the journey of inputs through the pipeline run. |
|
||||
| **Crews Outputs** | `crews_outputs` | `List[CrewOutput]` | A list of `CrewOutput` objects, representing the outputs from each crew in the pipeline run. |
|
||||
|
||||
### Pipeline Run Result Methods and Properties
|
||||
|
||||
| Method/Property | Description |
|
||||
| :-------------- | :------------------------------------------------------------------------------------------------------- |
|
||||
| **json** | Returns the JSON string representation of the run result if the output format of the final task is JSON. |
|
||||
| **to_dict** | Converts the JSON and Pydantic outputs to a dictionary. |
|
||||
| \***\*str\*\*** | Returns the string representation of the run result, prioritizing Pydantic, then JSON, then raw. |
|
||||
|
||||
### Accessing Pipeline Outputs
|
||||
|
||||
Once a pipeline has been executed, its output can be accessed through the `PipelineOutput` object returned by the `process_runs` method. The `PipelineOutput` class provides access to individual `PipelineRunResult` objects, each representing a single run through the pipeline.
|
||||
|
||||
#### Example
|
||||
|
||||
```python
|
||||
# Define input data for the pipeline
|
||||
input_data = [{"initial_query": "Latest advancements in AI"}, {"initial_query": "Future of robotics"}]
|
||||
|
||||
# Execute the pipeline
|
||||
pipeline_output = await my_pipeline.process_runs(input_data)
|
||||
|
||||
# Access the results
|
||||
for run_result in pipeline_output.run_results:
|
||||
print(f"Run ID: {run_result.id}")
|
||||
print(f"Final Raw Output: {run_result.raw}")
|
||||
if run_result.json_dict:
|
||||
print(f"JSON Output: {json.dumps(run_result.json_dict, indent=2)}")
|
||||
if run_result.pydantic:
|
||||
print(f"Pydantic Output: {run_result.pydantic}")
|
||||
print(f"Token Usage: {run_result.token_usage}")
|
||||
print(f"Trace: {run_result.trace}")
|
||||
print("Crew Outputs:")
|
||||
for crew_output in run_result.crews_outputs:
|
||||
print(f" Crew: {crew_output.raw}")
|
||||
print("\n")
|
||||
```
|
||||
|
||||
This example demonstrates how to access and work with the pipeline output, including individual run results and their associated data.
|
||||
|
||||
## Using Pipelines
|
||||
|
||||
Pipelines are particularly useful for complex workflows that involve multiple stages of processing, analysis, or content generation. They allow you to:
|
||||
|
||||
1. **Sequence Operations**: Execute crews in a specific order, ensuring that the output of one crew is available as input to the next.
|
||||
2. **Parallel Processing**: Run multiple crews concurrently within a stage for increased efficiency.
|
||||
3. **Manage Complex Workflows**: Break down large tasks into smaller, manageable steps executed by specialized crews.
|
||||
|
||||
### Example: Running a Pipeline
|
||||
|
||||
```python
|
||||
# Define input data for the pipeline
|
||||
input_data = [{"initial_query": "Latest advancements in AI"}]
|
||||
|
||||
# Execute the pipeline, initiating a run for each input
|
||||
results = await my_pipeline.process_runs(input_data)
|
||||
|
||||
# Access the results
|
||||
for result in results:
|
||||
print(f"Final Output: {result.raw}")
|
||||
print(f"Token Usage: {result.token_usage}")
|
||||
print(f"Trace: {result.trace}") # Shows the path of the input through all stages
|
||||
```
|
||||
|
||||
## Advanced Features
|
||||
|
||||
### Parallel Execution within Stages
|
||||
|
||||
You can define parallel execution within a stage by providing a list of crews, creating multiple branches:
|
||||
|
||||
```python
|
||||
parallel_analysis_crew = Crew(agents=[financial_analyst], tasks=[financial_analysis_task])
|
||||
market_analysis_crew = Crew(agents=[market_analyst], tasks=[market_analysis_task])
|
||||
|
||||
my_pipeline = Pipeline(
|
||||
stages=[
|
||||
research_crew,
|
||||
[parallel_analysis_crew, market_analysis_crew], # Parallel execution (branching)
|
||||
writing_crew
|
||||
]
|
||||
)
|
||||
```
|
||||
|
||||
### Error Handling and Validation
|
||||
|
||||
The Pipeline class includes validation mechanisms to ensure the robustness of the pipeline structure:
|
||||
|
||||
- Validates that stages contain only Crew instances or lists of Crew instances.
|
||||
- Prevents double nesting of stages to maintain a clear structure.
|
||||
|
||||
## Best Practices for Pipelines
|
||||
|
||||
1. **Clear Stage Definition**: Define each stage with a clear purpose and expected output.
|
||||
2. **Effective Data Flow**: Ensure that the output of each stage is in a format suitable for the input of the next stage.
|
||||
3. \*\*Parallel Proce
|
||||
@@ -232,7 +232,7 @@ def callback_function(output: TaskOutput):
|
||||
print(f"""
|
||||
Task completed!
|
||||
Task: {output.description}
|
||||
Output: {output.raw}
|
||||
Output: {output.raw_output}
|
||||
""")
|
||||
|
||||
research_task = Task(
|
||||
|
||||
41
docs/core-concepts/Testing.md
Normal file
41
docs/core-concepts/Testing.md
Normal file
@@ -0,0 +1,41 @@
|
||||
---
|
||||
title: crewAI Testing
|
||||
description: Learn how to test your crewAI Crew and evaluate their performance.
|
||||
---
|
||||
|
||||
## Introduction
|
||||
|
||||
Testing is a crucial part of the development process, and it is essential to ensure that your crew is performing as expected. And with crewAI, you can easily test your crew and evaluate its performance using the built-in testing capabilities.
|
||||
|
||||
### Using the Testing Feature
|
||||
|
||||
We added the CLI command `crewai test` to make it easy to test your crew. This command will run your crew for a specified number of iterations and provide detailed performance metrics.
|
||||
The parameters are `n_iterations` and `model` which are optional and default to 2 and `gpt-4o-mini` respectively. For now the only provider available is OpenAI.
|
||||
|
||||
```bash
|
||||
crewai test
|
||||
```
|
||||
|
||||
If you want to run more iterations or use a different model, you can specify the parameters like this:
|
||||
|
||||
```bash
|
||||
crewai test --n_iterations 5 --model gpt-4o
|
||||
```
|
||||
|
||||
What happens when you run the `crewai test` command is that the crew will be executed for the specified number of iterations, and the performance metrics will be displayed at the end of the run.
|
||||
|
||||
A table of scores at the end will show the performance of the crew in terms of the following metrics:
|
||||
```
|
||||
Task Scores
|
||||
(1-10 Higher is better)
|
||||
┏━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━┓
|
||||
┃ Tasks/Crew ┃ Run 1 ┃ Run 2 ┃ Avg. Total ┃
|
||||
┡━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━┩
|
||||
│ Task 1 │ 10.0 │ 9.0 │ 9.5 │
|
||||
│ Task 2 │ 9.0 │ 9.0 │ 9.0 │
|
||||
│ Crew │ 9.5 │ 9.0 │ 9.2 │
|
||||
└────────────┴───────┴───────┴────────────┘
|
||||
```
|
||||
|
||||
The example above shows the test results for two runs of the crew with two tasks, with the average total score for each task and the crew as a whole.
|
||||
|
||||
@@ -33,11 +33,6 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
|
||||
Crews
|
||||
</a>
|
||||
</li>
|
||||
<li>
|
||||
<a href="./core-concepts/Pipeline">
|
||||
Pipeline
|
||||
</a>
|
||||
</li>
|
||||
<li>
|
||||
<a href="./core-concepts/Training-Crew">
|
||||
Training
|
||||
|
||||
@@ -129,6 +129,7 @@ nav:
|
||||
- Training: 'core-concepts/Training-Crew.md'
|
||||
- Memory: 'core-concepts/Memory.md'
|
||||
- Planning: 'core-concepts/Planning.md'
|
||||
- Testing: 'core-concepts/Testing.md'
|
||||
- Using LangChain Tools: 'core-concepts/Using-LangChain-Tools.md'
|
||||
- Using LlamaIndex Tools: 'core-concepts/Using-LlamaIndex-Tools.md'
|
||||
- How to Guides:
|
||||
|
||||
67
poetry.lock
generated
67
poetry.lock
generated
@@ -2,13 +2,13 @@
|
||||
|
||||
[[package]]
|
||||
name = "agentops"
|
||||
version = "0.3.2"
|
||||
version = "0.3.0"
|
||||
description = "Python SDK for developing AI agent evals and observability"
|
||||
optional = true
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "agentops-0.3.2-py3-none-any.whl", hash = "sha256:b35988e04378624204572bb3d7a454094f879ea573f05b57d4e75ab0bfbb82af"},
|
||||
{file = "agentops-0.3.2.tar.gz", hash = "sha256:55559ac4a43634831dfa8937c2597c28e332809dc7c6bb3bc3c8b233442e224c"},
|
||||
{file = "agentops-0.3.0-py3-none-any.whl", hash = "sha256:22aeb3355e66b32a2b2a9f676048b81979b2488feddb088f9266034b3ed50539"},
|
||||
{file = "agentops-0.3.0.tar.gz", hash = "sha256:6c0c08a57410fa5e826a7bafa1deeba9f7b3524709427d9e1abbd0964caaf76b"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -3337,13 +3337,13 @@ sympy = "*"
|
||||
|
||||
[[package]]
|
||||
name = "openai"
|
||||
version = "1.36.1"
|
||||
version = "1.36.0"
|
||||
description = "The official Python library for the openai API"
|
||||
optional = false
|
||||
python-versions = ">=3.7.1"
|
||||
files = [
|
||||
{file = "openai-1.36.1-py3-none-any.whl", hash = "sha256:d399b9d476dbbc167aceaac6bc6ed0b2e2bb6c9e189c7f7047f822743ae62e64"},
|
||||
{file = "openai-1.36.1.tar.gz", hash = "sha256:41be9e0302e95dba8a9374b885c5cb1cec2202816a70b98736fee25a2cadd1f2"},
|
||||
{file = "openai-1.36.0-py3-none-any.whl", hash = "sha256:82b74ded1fe2ea94abb19a007178bc143675f1b6903cebd63e2968d654bb0a6f"},
|
||||
{file = "openai-1.36.0.tar.gz", hash = "sha256:a124baf0e1657d6156e12248642f88489cd030be8655b69bc1c13eb50e71a93d"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -4085,19 +4085,6 @@ files = [
|
||||
{file = "pyarrow-17.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:392bc9feabc647338e6c89267635e111d71edad5fcffba204425a7c8d13610d7"},
|
||||
{file = "pyarrow-17.0.0-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:af5ff82a04b2171415f1410cff7ebb79861afc5dae50be73ce06d6e870615204"},
|
||||
{file = "pyarrow-17.0.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:edca18eaca89cd6382dfbcff3dd2d87633433043650c07375d095cd3517561d8"},
|
||||
{file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7c7916bff914ac5d4a8fe25b7a25e432ff921e72f6f2b7547d1e325c1ad9d155"},
|
||||
{file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f553ca691b9e94b202ff741bdd40f6ccb70cdd5fbf65c187af132f1317de6145"},
|
||||
{file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:0cdb0e627c86c373205a2f94a510ac4376fdc523f8bb36beab2e7f204416163c"},
|
||||
{file = "pyarrow-17.0.0-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:d7d192305d9d8bc9082d10f361fc70a73590a4c65cf31c3e6926cd72b76bc35c"},
|
||||
{file = "pyarrow-17.0.0-cp38-cp38-win_amd64.whl", hash = "sha256:02dae06ce212d8b3244dd3e7d12d9c4d3046945a5933d28026598e9dbbda1fca"},
|
||||
{file = "pyarrow-17.0.0-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:13d7a460b412f31e4c0efa1148e1d29bdf18ad1411eb6757d38f8fbdcc8645fb"},
|
||||
{file = "pyarrow-17.0.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:9b564a51fbccfab5a04a80453e5ac6c9954a9c5ef2890d1bcf63741909c3f8df"},
|
||||
{file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:32503827abbc5aadedfa235f5ece8c4f8f8b0a3cf01066bc8d29de7539532687"},
|
||||
{file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a155acc7f154b9ffcc85497509bcd0d43efb80d6f733b0dc3bb14e281f131c8b"},
|
||||
{file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:dec8d129254d0188a49f8a1fc99e0560dc1b85f60af729f47de4046015f9b0a5"},
|
||||
{file = "pyarrow-17.0.0-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:a48ddf5c3c6a6c505904545c25a4ae13646ae1f8ba703c4df4a1bfe4f4006bda"},
|
||||
{file = "pyarrow-17.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:42bf93249a083aca230ba7e2786c5f673507fa97bbd9725a1e2754715151a204"},
|
||||
{file = "pyarrow-17.0.0.tar.gz", hash = "sha256:4beca9521ed2c0921c1023e68d097d0299b62c362639ea315572a58f3f50fd28"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -4334,13 +4321,13 @@ extra = ["pygments (>=2.12)"]
|
||||
|
||||
[[package]]
|
||||
name = "pypdf"
|
||||
version = "4.3.1"
|
||||
version = "4.3.0"
|
||||
description = "A pure-python PDF library capable of splitting, merging, cropping, and transforming PDF files"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
{file = "pypdf-4.3.1-py3-none-any.whl", hash = "sha256:64b31da97eda0771ef22edb1bfecd5deee4b72c3d1736b7df2689805076d6418"},
|
||||
{file = "pypdf-4.3.1.tar.gz", hash = "sha256:b2f37fe9a3030aa97ca86067a56ba3f9d3565f9a791b305c7355d8392c30d91b"},
|
||||
{file = "pypdf-4.3.0-py3-none-any.whl", hash = "sha256:eeea4d019b57c099d02a0e1692eaaab23341ae3f255c1dafa3c8566b4636496d"},
|
||||
{file = "pypdf-4.3.0.tar.gz", hash = "sha256:0d7a4c67fd03782f5a09d3f48c11c7a31e0bb9af78861a25229bb49259ed0504"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -4427,13 +4414,13 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "pytest"
|
||||
version = "8.3.1"
|
||||
version = "8.2.2"
|
||||
description = "pytest: simple powerful testing with Python"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "pytest-8.3.1-py3-none-any.whl", hash = "sha256:e9600ccf4f563976e2c99fa02c7624ab938296551f280835ee6516df8bc4ae8c"},
|
||||
{file = "pytest-8.3.1.tar.gz", hash = "sha256:7e8e5c5abd6e93cb1cc151f23e57adc31fcf8cfd2a3ff2da63e23f732de35db6"},
|
||||
{file = "pytest-8.2.2-py3-none-any.whl", hash = "sha256:c434598117762e2bd304e526244f67bf66bbd7b5d6cf22138be51ff661980343"},
|
||||
{file = "pytest-8.2.2.tar.gz", hash = "sha256:de4bb8104e201939ccdc688b27a89a7be2079b22e2bd2b07f806b6ba71117977"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -4441,30 +4428,12 @@ colorama = {version = "*", markers = "sys_platform == \"win32\""}
|
||||
exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""}
|
||||
iniconfig = "*"
|
||||
packaging = "*"
|
||||
pluggy = ">=1.5,<2"
|
||||
pluggy = ">=1.5,<2.0"
|
||||
tomli = {version = ">=1", markers = "python_version < \"3.11\""}
|
||||
|
||||
[package.extras]
|
||||
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
|
||||
|
||||
[[package]]
|
||||
name = "pytest-asyncio"
|
||||
version = "0.23.8"
|
||||
description = "Pytest support for asyncio"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "pytest_asyncio-0.23.8-py3-none-any.whl", hash = "sha256:50265d892689a5faefb84df80819d1ecef566eb3549cf915dfb33569359d1ce2"},
|
||||
{file = "pytest_asyncio-0.23.8.tar.gz", hash = "sha256:759b10b33a6dc61cce40a8bd5205e302978bbbcc00e279a8b61d9a6a3c82e4d3"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
pytest = ">=7.0.0,<9"
|
||||
|
||||
[package.extras]
|
||||
docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"]
|
||||
testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"]
|
||||
|
||||
[[package]]
|
||||
name = "pytest-vcr"
|
||||
version = "1.0.2"
|
||||
@@ -4930,19 +4899,19 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "setuptools"
|
||||
version = "71.1.0"
|
||||
version = "71.0.4"
|
||||
description = "Easily download, build, install, upgrade, and uninstall Python packages"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "setuptools-71.1.0-py3-none-any.whl", hash = "sha256:33874fdc59b3188304b2e7c80d9029097ea31627180896fb549c578ceb8a0855"},
|
||||
{file = "setuptools-71.1.0.tar.gz", hash = "sha256:032d42ee9fb536e33087fb66cac5f840eb9391ed05637b3f2a76a7c8fb477936"},
|
||||
{file = "setuptools-71.0.4-py3-none-any.whl", hash = "sha256:ed2feca703be3bdbd94e6bb17365d91c6935c6b2a8d0bb09b66a2c435ba0b1a5"},
|
||||
{file = "setuptools-71.0.4.tar.gz", hash = "sha256:48297e5d393a62b7cb2a10b8f76c63a73af933bd809c9e0d0d6352a1a0135dd8"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
core = ["importlib-metadata (>=6)", "importlib-resources (>=5.10.2)", "jaraco.text (>=3.7)", "more-itertools (>=8.8)", "ordered-set (>=3.1.1)", "packaging (>=24)", "platformdirs (>=2.6.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"]
|
||||
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
|
||||
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "jaraco.test", "mypy (==1.11.*)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (<0.4)", "pytest-ruff (>=0.2.1)", "pytest-ruff (>=0.3.2)", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
|
||||
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "jaraco.test", "mypy (==1.10.0)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (<0.4)", "pytest-ruff (>=0.2.1)", "pytest-ruff (>=0.3.2)", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
|
||||
|
||||
[[package]]
|
||||
name = "shapely"
|
||||
@@ -6161,4 +6130,4 @@ tools = ["crewai-tools"]
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = ">=3.10,<=3.13"
|
||||
content-hash = "8df022f5ec0997c0a0f5710476139d9117c1057889c158e958f2c8efd22a4756"
|
||||
content-hash = "f5ad9babb3c57c405e39232020e8cbfaaeb5c315c2e7c5bb8fdf66792f260343"
|
||||
|
||||
@@ -52,7 +52,6 @@ crewai-tools = "^0.4.26"
|
||||
pytest = "^8.0.0"
|
||||
pytest-vcr = "^1.0.2"
|
||||
python-dotenv = "1.0.0"
|
||||
pytest-asyncio = "^0.23.7"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
crewai = "crewai.cli.cli:crewai"
|
||||
|
||||
@@ -199,9 +199,7 @@ class Agent(BaseAgent):
|
||||
"tools": self.agent_executor.tools_description,
|
||||
}
|
||||
)["output"]
|
||||
print("Result when things went well:", result)
|
||||
except Exception as e:
|
||||
print("FAILED TO EXECUTE TASK", e)
|
||||
self._times_executed += 1
|
||||
if self._times_executed > self.max_retry_limit:
|
||||
raise e
|
||||
@@ -217,7 +215,6 @@ class Agent(BaseAgent):
|
||||
if tool_result.get("result_as_answer", False):
|
||||
result = tool_result["result"]
|
||||
|
||||
print("RESULT TO RETURN", result)
|
||||
return result
|
||||
|
||||
def format_log_to_str(
|
||||
|
||||
@@ -56,7 +56,7 @@ class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
|
||||
)
|
||||
intermediate_steps: List[Tuple[AgentAction, str]] = []
|
||||
# Allowing human input given task setting
|
||||
if self.task and self.task.human_input:
|
||||
if self.task.human_input:
|
||||
self.should_ask_for_human_input = True
|
||||
|
||||
# Let's start tracking the number of iterations and time elapsed
|
||||
|
||||
@@ -5,11 +5,11 @@ from crewai.memory.storage.kickoff_task_outputs_storage import (
|
||||
KickoffTaskOutputsSQLiteStorage,
|
||||
)
|
||||
|
||||
|
||||
from .create_crew import create_crew
|
||||
from .train_crew import train_crew
|
||||
from .replay_from_task import replay_task_command
|
||||
from .reset_memories_command import reset_memories_command
|
||||
from .test_crew import test_crew
|
||||
from .train_crew import train_crew
|
||||
|
||||
|
||||
@click.group()
|
||||
@@ -126,5 +126,26 @@ def reset_memories(long, short, entities, kickoff_outputs, all):
|
||||
click.echo(f"An error occurred while resetting memories: {e}", err=True)
|
||||
|
||||
|
||||
@crewai.command()
|
||||
@click.option(
|
||||
"-n",
|
||||
"--n_iterations",
|
||||
type=int,
|
||||
default=3,
|
||||
help="Number of iterations to Test the crew",
|
||||
)
|
||||
@click.option(
|
||||
"-m",
|
||||
"--model",
|
||||
type=str,
|
||||
default="gpt-4o-mini",
|
||||
help="LLM Model to run the tests on the Crew. For now only accepting only OpenAI models.",
|
||||
)
|
||||
def test(n_iterations: int, model: str):
|
||||
"""Test the crew and evaluate the results."""
|
||||
click.echo(f"Testing the crew for {n_iterations} iterations with model {model}")
|
||||
test_crew(n_iterations, model)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
crewai()
|
||||
|
||||
@@ -39,3 +39,16 @@ def replay():
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"An error occurred while replaying the crew: {e}")
|
||||
|
||||
def test():
|
||||
"""
|
||||
Test the crew execution and returns the results.
|
||||
"""
|
||||
inputs = {
|
||||
"topic": "AI LLMs"
|
||||
}
|
||||
try:
|
||||
{{crew_name}}Crew().crew().test(n_iterations=int(sys.argv[1]), openai_model_name=sys.argv[2], inputs=inputs)
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"An error occurred while replaying the crew: {e}")
|
||||
|
||||
@@ -12,6 +12,7 @@ crewai = { extras = ["tools"], version = "^0.41.1" }
|
||||
{{folder_name}} = "{{folder_name}}.main:run"
|
||||
train = "{{folder_name}}.main:train"
|
||||
replay = "{{folder_name}}.main:replay"
|
||||
test = "{{folder_name}}.main:test"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
|
||||
30
src/crewai/cli/test_crew.py
Normal file
30
src/crewai/cli/test_crew.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import subprocess
|
||||
|
||||
import click
|
||||
|
||||
|
||||
def test_crew(n_iterations: int, model: str) -> None:
|
||||
"""
|
||||
Test the crew by running a command in the Poetry environment.
|
||||
|
||||
Args:
|
||||
n_iterations (int): The number of iterations to test the crew.
|
||||
model (str): The model to test the crew with.
|
||||
"""
|
||||
command = ["poetry", "run", "test", str(n_iterations), model]
|
||||
|
||||
try:
|
||||
if n_iterations <= 0:
|
||||
raise ValueError("The number of iterations must be a positive integer.")
|
||||
|
||||
result = subprocess.run(command, capture_output=False, text=True, check=True)
|
||||
|
||||
if result.stderr:
|
||||
click.echo(result.stderr, err=True)
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
click.echo(f"An error occurred while testing the crew: {e}", err=True)
|
||||
click.echo(e.output, err=True)
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"An unexpected error occurred: {e}", err=True)
|
||||
@@ -3,7 +3,7 @@ import json
|
||||
import uuid
|
||||
from concurrent.futures import Future
|
||||
from hashlib import md5
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from langchain_core.callbacks import BaseCallbackHandler
|
||||
from pydantic import (
|
||||
@@ -33,7 +33,11 @@ from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.telemetry import Telemetry
|
||||
from crewai.tools.agent_tools import AgentTools
|
||||
from crewai.utilities import I18N, FileHandler, Logger, RPMController
|
||||
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
|
||||
from crewai.utilities.constants import (
|
||||
TRAINED_AGENTS_DATA_FILE,
|
||||
TRAINING_DATA_FILE,
|
||||
)
|
||||
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
|
||||
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
|
||||
from crewai.utilities.formatter import (
|
||||
aggregate_raw_outputs_from_task_outputs,
|
||||
@@ -48,9 +52,6 @@ try:
|
||||
except ImportError:
|
||||
agentops = None
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.pipeline.pipeline import Pipeline
|
||||
|
||||
|
||||
class Crew(BaseModel):
|
||||
"""
|
||||
@@ -96,13 +97,12 @@ class Crew(BaseModel):
|
||||
default_factory=TaskOutputStorageHandler
|
||||
)
|
||||
|
||||
name: Optional[str] = Field(default="")
|
||||
cache: bool = Field(default=True)
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
tasks: List[Task] = Field(default_factory=list)
|
||||
agents: List[BaseAgent] = Field(default_factory=list)
|
||||
process: Process = Field(default=Process.sequential)
|
||||
verbose: int = Field(default=0)
|
||||
verbose: Union[int, bool] = Field(default=0)
|
||||
memory: bool = Field(
|
||||
default=False,
|
||||
description="Whether the crew should use memory to store memories of it's execution",
|
||||
@@ -147,8 +147,8 @@ class Crew(BaseModel):
|
||||
default=None,
|
||||
description="Path to the prompt json file to be used for the crew.",
|
||||
)
|
||||
output_log_file: Optional[str] = Field(
|
||||
default="",
|
||||
output_log_file: Optional[Union[bool, str]] = Field(
|
||||
default=False,
|
||||
description="output_log_file",
|
||||
)
|
||||
planning: Optional[bool] = Field(
|
||||
@@ -967,17 +967,20 @@ class Crew(BaseModel):
|
||||
|
||||
return total_usage_metrics
|
||||
|
||||
def __rshift__(self, other: "Crew") -> "Pipeline":
|
||||
"""
|
||||
Implements the >> operator to add another Crew to an existing Pipeline.
|
||||
"""
|
||||
from crewai.pipeline.pipeline import Pipeline
|
||||
def test(
|
||||
self,
|
||||
n_iterations: int,
|
||||
openai_model_name: str,
|
||||
inputs: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
"""Test and evaluate the Crew with the given inputs for n iterations."""
|
||||
evaluator = CrewEvaluator(self, openai_model_name)
|
||||
|
||||
if not isinstance(other, Crew):
|
||||
raise TypeError(
|
||||
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
|
||||
)
|
||||
return Pipeline(stages=[self, other])
|
||||
for i in range(1, n_iterations + 1):
|
||||
evaluator.set_iteration(i)
|
||||
self.kickoff(inputs=inputs)
|
||||
|
||||
evaluator.print_crew_evaluation_result()
|
||||
|
||||
def __repr__(self):
|
||||
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
from crewai.pipeline.pipeline import Pipeline
|
||||
|
||||
__all__ = ["Pipeline"]
|
||||
@@ -1,218 +0,0 @@
|
||||
import asyncio
|
||||
import copy
|
||||
from typing import Any, Dict, List, Tuple, Union
|
||||
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
|
||||
from crewai.crew import Crew
|
||||
from crewai.crews.crew_output import CrewOutput
|
||||
from crewai.pipeline.pipeline_run_result import PipelineRunResult
|
||||
|
||||
Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]]
|
||||
|
||||
|
||||
"""
|
||||
Pipeline Terminology:
|
||||
Pipeline: The overall structure that defines a sequence of operations.
|
||||
Stage: A distinct part of the pipeline, which can be either sequential or parallel.
|
||||
Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
|
||||
Branch: Parallel executions within a stage (e.g., concurrent crew operations).
|
||||
Trace: The journey of an individual input through the entire pipeline.
|
||||
|
||||
Example pipeline structure:
|
||||
crew1 >> crew2 >> crew3
|
||||
|
||||
This represents a pipeline with three sequential stages:
|
||||
1. crew1 is the first stage, which processes the input and passes its output to crew2.
|
||||
2. crew2 is the second stage, which takes the output from crew1 as its input, processes it, and passes its output to crew3.
|
||||
3. crew3 is the final stage, which takes the output from crew2 as its input and produces the final output of the pipeline.
|
||||
|
||||
Each input creates its own run, flowing through all stages of the pipeline.
|
||||
Multiple runs can be processed concurrently, each following the defined pipeline structure.
|
||||
|
||||
Another example pipeline structure:
|
||||
crew1 >> [crew2, crew3] >> crew4
|
||||
|
||||
This represents a pipeline with three stages:
|
||||
1. A sequential stage (crew1)
|
||||
2. A parallel stage with two branches (crew2 and crew3 executing concurrently)
|
||||
3. Another sequential stage (crew4)
|
||||
|
||||
Each input creates its own run, flowing through all stages of the pipeline.
|
||||
Multiple runs can be processed concurrently, each following the defined pipeline structure.
|
||||
"""
|
||||
|
||||
|
||||
class Pipeline(BaseModel):
|
||||
stages: List[Union[Crew, List[Crew]]] = Field(
|
||||
..., description="List of crews representing stages to be executed in sequence"
|
||||
)
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_stages(cls, values):
|
||||
stages = values.get("stages", [])
|
||||
|
||||
def check_nesting_and_type(item, depth=0):
|
||||
if depth > 1:
|
||||
raise ValueError("Double nesting is not allowed in pipeline stages")
|
||||
if isinstance(item, list):
|
||||
for sub_item in item:
|
||||
check_nesting_and_type(sub_item, depth + 1)
|
||||
elif not isinstance(item, Crew):
|
||||
raise ValueError(
|
||||
f"Expected Crew instance or list of Crews, got {type(item)}"
|
||||
)
|
||||
|
||||
for stage in stages:
|
||||
check_nesting_and_type(stage)
|
||||
return values
|
||||
|
||||
async def process_runs(
|
||||
self, run_inputs: List[Dict[str, Any]]
|
||||
) -> List[PipelineRunResult]:
|
||||
"""
|
||||
Process multiple runs in parallel, with each run going through all stages.
|
||||
"""
|
||||
pipeline_results = []
|
||||
|
||||
# Process all runs in parallel
|
||||
all_run_results = await asyncio.gather(
|
||||
*(self.process_single_run(input_data) for input_data in run_inputs)
|
||||
)
|
||||
|
||||
# Flatten the list of lists into a single list of results
|
||||
pipeline_results.extend(
|
||||
result for run_result in all_run_results for result in run_result
|
||||
)
|
||||
|
||||
return pipeline_results
|
||||
|
||||
async def process_single_run(
|
||||
self, run_input: Dict[str, Any]
|
||||
) -> List[PipelineRunResult]:
|
||||
initial_input = copy.deepcopy(run_input)
|
||||
current_input = copy.deepcopy(run_input)
|
||||
usage_metrics = {}
|
||||
all_stage_outputs: List[List[CrewOutput]] = []
|
||||
traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]]
|
||||
|
||||
for stage in self.stages:
|
||||
stage_input = copy.deepcopy(current_input)
|
||||
stage_outputs, stage_trace = await self._process_stage(stage, stage_input)
|
||||
|
||||
self._update_metrics_and_input(
|
||||
usage_metrics, current_input, stage, stage_outputs
|
||||
)
|
||||
traces.append(stage_trace)
|
||||
all_stage_outputs.append(stage_outputs)
|
||||
|
||||
return self._build_pipeline_run_results(
|
||||
all_stage_outputs, traces, usage_metrics
|
||||
)
|
||||
|
||||
async def _process_stage(
|
||||
self, stage: Union[Crew, List[Crew]], current_input: Dict[str, Any]
|
||||
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
|
||||
if isinstance(stage, Crew):
|
||||
return await self._process_single_crew(stage, current_input)
|
||||
else:
|
||||
return await self._process_parallel_crews(stage, current_input)
|
||||
|
||||
async def _process_single_crew(
|
||||
self, crew: Crew, current_input: Dict[str, Any]
|
||||
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
|
||||
output = await crew.kickoff_async(inputs=current_input)
|
||||
return [output], [crew.name or str(crew.id)]
|
||||
|
||||
async def _process_parallel_crews(
|
||||
self, crews: List[Crew], current_input: Dict[str, Any]
|
||||
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
|
||||
parallel_outputs = await asyncio.gather(
|
||||
*[crew.kickoff_async(inputs=current_input) for crew in crews]
|
||||
)
|
||||
return parallel_outputs, [crew.name or str(crew.id) for crew in crews]
|
||||
|
||||
def _update_metrics_and_input(
|
||||
self,
|
||||
usage_metrics: Dict[str, Any],
|
||||
current_input: Dict[str, Any],
|
||||
stage: Union[Crew, List[Crew]],
|
||||
outputs: List[CrewOutput],
|
||||
) -> None:
|
||||
for crew, output in zip([stage] if isinstance(stage, Crew) else stage, outputs):
|
||||
usage_metrics[crew.name or str(crew.id)] = output.token_usage
|
||||
current_input.update(output.to_dict())
|
||||
|
||||
def _build_pipeline_run_results(
|
||||
self,
|
||||
all_stage_outputs: List[List[CrewOutput]],
|
||||
traces: List[List[Union[str, Dict[str, Any]]]],
|
||||
token_usage: Dict[str, Any],
|
||||
) -> List[PipelineRunResult]:
|
||||
formatted_traces = self._format_traces(traces)
|
||||
formatted_crew_outputs = self._format_crew_outputs(all_stage_outputs)
|
||||
|
||||
return [
|
||||
PipelineRunResult(
|
||||
token_usage=token_usage,
|
||||
trace=formatted_trace,
|
||||
raw=crews_outputs[-1].raw,
|
||||
pydantic=crews_outputs[-1].pydantic,
|
||||
json_dict=crews_outputs[-1].json_dict,
|
||||
crews_outputs=crews_outputs,
|
||||
)
|
||||
for crews_outputs, formatted_trace in zip(
|
||||
formatted_crew_outputs, formatted_traces
|
||||
)
|
||||
]
|
||||
|
||||
def _format_traces(
|
||||
self, traces: List[List[Union[str, Dict[str, Any]]]]
|
||||
) -> List[List[Trace]]:
|
||||
formatted_traces: List[Trace] = []
|
||||
for trace in traces[:-1]:
|
||||
formatted_traces.append(trace[0] if len(trace) == 1 else trace)
|
||||
|
||||
traces_to_return: List[List[Trace]] = []
|
||||
final_trace = traces[-1]
|
||||
if len(final_trace) == 1:
|
||||
formatted_traces.append(final_trace[0])
|
||||
traces_to_return.append(formatted_traces)
|
||||
else:
|
||||
for trace in final_trace:
|
||||
copied_traces = formatted_traces.copy()
|
||||
copied_traces.append(trace)
|
||||
traces_to_return.append(copied_traces)
|
||||
|
||||
return traces_to_return
|
||||
|
||||
def _format_crew_outputs(
|
||||
self, all_stage_outputs: List[List[CrewOutput]]
|
||||
) -> List[List[CrewOutput]]:
|
||||
crew_outputs: List[CrewOutput] = [
|
||||
output
|
||||
for stage_outputs in all_stage_outputs[:-1]
|
||||
for output in stage_outputs
|
||||
]
|
||||
return [crew_outputs + [output] for output in all_stage_outputs[-1]]
|
||||
|
||||
def __rshift__(self, other: Any) -> "Pipeline":
|
||||
"""
|
||||
Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline.
|
||||
"""
|
||||
if isinstance(other, Crew):
|
||||
return type(self)(stages=self.stages + [other])
|
||||
elif isinstance(other, list) and all(isinstance(crew, Crew) for crew in other):
|
||||
return type(self)(stages=self.stages + [other])
|
||||
else:
|
||||
raise TypeError(
|
||||
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
|
||||
)
|
||||
|
||||
|
||||
# Helper function to run the pipeline
|
||||
async def run_pipeline(
|
||||
pipeline: Pipeline, inputs: List[Dict[str, Any]]
|
||||
) -> List[PipelineRunResult]:
|
||||
return await pipeline.process_runs(inputs)
|
||||
@@ -1,20 +0,0 @@
|
||||
import uuid
|
||||
from typing import List
|
||||
|
||||
from pydantic import UUID4, BaseModel, Field
|
||||
|
||||
from crewai.pipeline.pipeline_run_result import PipelineRunResult
|
||||
|
||||
|
||||
class PipelineOutput(BaseModel):
|
||||
id: UUID4 = Field(
|
||||
default_factory=uuid.uuid4,
|
||||
frozen=True,
|
||||
description="Unique identifier for the object, not set by user.",
|
||||
)
|
||||
run_results: List[PipelineRunResult] = Field(
|
||||
description="List of results for each run through the pipeline", default=[]
|
||||
)
|
||||
|
||||
def add_run_result(self, result: PipelineRunResult):
|
||||
self.run_results.append(result)
|
||||
@@ -1,60 +0,0 @@
|
||||
import json
|
||||
import uuid
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from pydantic import UUID4, BaseModel, Field
|
||||
|
||||
from crewai.crews.crew_output import CrewOutput
|
||||
|
||||
|
||||
class PipelineRunResult(BaseModel):
|
||||
"""Class that represents the result of a pipeline run."""
|
||||
|
||||
id: UUID4 = Field(
|
||||
default_factory=uuid.uuid4,
|
||||
frozen=True,
|
||||
description="Unique identifier for the object, not set by user.",
|
||||
)
|
||||
raw: str = Field(description="Raw output of the pipeline run", default="")
|
||||
pydantic: Any = Field(
|
||||
description="Pydantic output of the pipeline run", default=None
|
||||
)
|
||||
json_dict: Union[Dict[str, Any], None] = Field(
|
||||
description="JSON dict output of the pipeline run", default={}
|
||||
)
|
||||
|
||||
token_usage: Dict[str, Any] = Field(
|
||||
description="Token usage for each crew in the run"
|
||||
)
|
||||
trace: List[Any] = Field(
|
||||
description="Trace of the journey of inputs through the run"
|
||||
)
|
||||
crews_outputs: List[CrewOutput] = Field(
|
||||
description="Output from each crew in the run",
|
||||
default=[],
|
||||
)
|
||||
|
||||
@property
|
||||
def json(self) -> Optional[str]:
|
||||
if self.crews_outputs[-1].tasks_output[-1].output_format != "json":
|
||||
raise ValueError(
|
||||
"No JSON output found in the final task of the final crew. Please make sure to set the output_json property in the final task in your crew."
|
||||
)
|
||||
|
||||
return json.dumps(self.json_dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert json_output and pydantic_output to a dictionary."""
|
||||
output_dict = {}
|
||||
if self.json_dict:
|
||||
output_dict.update(self.json_dict)
|
||||
elif self.pydantic:
|
||||
output_dict.update(self.pydantic.model_dump())
|
||||
return output_dict
|
||||
|
||||
def __str__(self):
|
||||
if self.pydantic:
|
||||
return str(self.pydantic)
|
||||
if self.json_dict:
|
||||
return str(self.json_dict)
|
||||
return self.raw
|
||||
@@ -1,67 +0,0 @@
|
||||
from typing import Any, Callable, Dict, List, Tuple, Union
|
||||
|
||||
from crewai.crew import Crew
|
||||
from crewai.pipeline.pipeline import Pipeline
|
||||
|
||||
|
||||
class Router:
|
||||
def __init__(self):
|
||||
self.conditions: List[
|
||||
Tuple[Callable[[Dict[str, Any]], bool], Union[Crew, Pipeline]]
|
||||
] = []
|
||||
self.default: Union[Crew, Pipeline, None] = None
|
||||
|
||||
def add_condition(
|
||||
self,
|
||||
condition: Callable[[Dict[str, Any]], bool],
|
||||
next_stage: Union[Crew, Pipeline],
|
||||
):
|
||||
"""
|
||||
Add a condition and its corresponding next stage to the router.
|
||||
|
||||
Args:
|
||||
condition: A function that takes the input dictionary and returns a boolean.
|
||||
next_stage: The Crew or Pipeline to execute if the condition is met.
|
||||
"""
|
||||
self.conditions.append((condition, next_stage))
|
||||
|
||||
def set_default(self, default_stage: Union[Crew, Pipeline]):
|
||||
"""Set the default stage to be executed if no conditions are met."""
|
||||
self.default = default_stage
|
||||
|
||||
def route(self, input_dict: Dict[str, Any]) -> Union[Crew, Pipeline]:
|
||||
"""
|
||||
Evaluate the input against the conditions and return the appropriate next stage.
|
||||
|
||||
Args:
|
||||
input_dict: The input dictionary to be evaluated.
|
||||
|
||||
Returns:
|
||||
The next Crew or Pipeline to be executed.
|
||||
|
||||
Raises:
|
||||
ValueError: If no conditions are met and no default is set.
|
||||
"""
|
||||
for condition, next_stage in self.conditions:
|
||||
if condition(input_dict):
|
||||
self._update_trace(input_dict, next_stage)
|
||||
return next_stage
|
||||
|
||||
if self.default is not None:
|
||||
self._update_trace(input_dict, self.default)
|
||||
return self.default
|
||||
|
||||
raise ValueError("No conditions were met and no default stage was set.")
|
||||
|
||||
def _update_trace(
|
||||
self, input_dict: Dict[str, Any], next_stage: Union[Crew, Pipeline]
|
||||
):
|
||||
"""Update the trace to show that the input went through the router."""
|
||||
if "trace" not in input_dict:
|
||||
input_dict["trace"] = []
|
||||
input_dict["trace"].append(
|
||||
{
|
||||
"router": self.__class__.__name__,
|
||||
"next_stage": next_stage.__class__.__name__,
|
||||
}
|
||||
)
|
||||
@@ -383,7 +383,7 @@ class Telemetry:
|
||||
{
|
||||
"id": str(task.id),
|
||||
"description": task.description,
|
||||
"output": task.output.raw,
|
||||
"output": task.output.raw_output,
|
||||
}
|
||||
for task in crew.tasks
|
||||
]
|
||||
|
||||
149
src/crewai/utilities/evaluators/crew_evaluator_handler.py
Normal file
149
src/crewai/utilities/evaluators/crew_evaluator_handler.py
Normal file
@@ -0,0 +1,149 @@
|
||||
from collections import defaultdict
|
||||
|
||||
from langchain_openai import ChatOpenAI
|
||||
from pydantic import BaseModel, Field
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
|
||||
class TaskEvaluationPydanticOutput(BaseModel):
|
||||
quality: float = Field(
|
||||
description="A score from 1 to 10 evaluating on completion, quality, and overall performance from the task_description and task_expected_output to the actual Task Output."
|
||||
)
|
||||
|
||||
|
||||
class CrewEvaluator:
|
||||
"""
|
||||
A class to evaluate the performance of the agents in the crew based on the tasks they have performed.
|
||||
|
||||
Attributes:
|
||||
crew (Crew): The crew of agents to evaluate.
|
||||
openai_model_name (str): The model to use for evaluating the performance of the agents (for now ONLY OpenAI accepted).
|
||||
tasks_scores (defaultdict): A dictionary to store the scores of the agents for each task.
|
||||
iteration (int): The current iteration of the evaluation.
|
||||
"""
|
||||
|
||||
tasks_scores: defaultdict = defaultdict(list)
|
||||
iteration: int = 0
|
||||
|
||||
def __init__(self, crew, openai_model_name: str):
|
||||
self.crew = crew
|
||||
self.openai_model_name = openai_model_name
|
||||
self._setup_for_evaluating()
|
||||
|
||||
def _setup_for_evaluating(self) -> None:
|
||||
"""Sets up the crew for evaluating."""
|
||||
for task in self.crew.tasks:
|
||||
task.callback = self.evaluate
|
||||
|
||||
def set_iteration(self, iteration: int) -> None:
|
||||
self.iteration = iteration
|
||||
|
||||
def _evaluator_agent(self):
|
||||
return Agent(
|
||||
role="Task Execution Evaluator",
|
||||
goal=(
|
||||
"Your goal is to evaluate the performance of the agents in the crew based on the tasks they have performed using score from 1 to 10 evaluating on completion, quality, and overall performance."
|
||||
),
|
||||
backstory="Evaluator agent for crew evaluation with precise capabilities to evaluate the performance of the agents in the crew based on the tasks they have performed",
|
||||
verbose=False,
|
||||
llm=ChatOpenAI(model=self.openai_model_name),
|
||||
)
|
||||
|
||||
def _evaluation_task(
|
||||
self, evaluator_agent: Agent, task_to_evaluate: Task, task_output: str
|
||||
) -> Task:
|
||||
return Task(
|
||||
description=(
|
||||
"Based on the task description and the expected output, compare and evaluate the performance of the agents in the crew based on the Task Output they have performed using score from 1 to 10 evaluating on completion, quality, and overall performance."
|
||||
f"task_description: {task_to_evaluate.description} "
|
||||
f"task_expected_output: {task_to_evaluate.expected_output} "
|
||||
f"agent: {task_to_evaluate.agent.role if task_to_evaluate.agent else None} "
|
||||
f"agent_goal: {task_to_evaluate.agent.goal if task_to_evaluate.agent else None} "
|
||||
f"Task Output: {task_output}"
|
||||
),
|
||||
expected_output="Evaluation Score from 1 to 10 based on the performance of the agents on the tasks",
|
||||
agent=evaluator_agent,
|
||||
output_pydantic=TaskEvaluationPydanticOutput,
|
||||
)
|
||||
|
||||
def print_crew_evaluation_result(self) -> None:
|
||||
"""
|
||||
Prints the evaluation result of the crew in a table.
|
||||
A Crew with 2 tasks using the command crewai test -n 2
|
||||
will output the following table:
|
||||
|
||||
Task Scores
|
||||
(1-10 Higher is better)
|
||||
┏━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━┓
|
||||
┃ Tasks/Crew ┃ Run 1 ┃ Run 2 ┃ Avg. Total ┃
|
||||
┡━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━┩
|
||||
│ Task 1 │ 10.0 │ 9.0 │ 9.5 │
|
||||
│ Task 2 │ 9.0 │ 9.0 │ 9.0 │
|
||||
│ Crew │ 9.5 │ 9.0 │ 9.2 │
|
||||
└────────────┴───────┴───────┴────────────┘
|
||||
"""
|
||||
task_averages = [
|
||||
sum(scores) / len(scores) for scores in zip(*self.tasks_scores.values())
|
||||
]
|
||||
crew_average = sum(task_averages) / len(task_averages)
|
||||
|
||||
# Create a table
|
||||
table = Table(title="Tasks Scores \n (1-10 Higher is better)")
|
||||
|
||||
# Add columns for the table
|
||||
table.add_column("Tasks/Crew")
|
||||
for run in range(1, len(self.tasks_scores) + 1):
|
||||
table.add_column(f"Run {run}")
|
||||
table.add_column("Avg. Total")
|
||||
|
||||
# Add rows for each task
|
||||
for task_index in range(len(task_averages)):
|
||||
task_scores = [
|
||||
self.tasks_scores[run][task_index]
|
||||
for run in range(1, len(self.tasks_scores) + 1)
|
||||
]
|
||||
avg_score = task_averages[task_index]
|
||||
table.add_row(
|
||||
f"Task {task_index + 1}", *map(str, task_scores), f"{avg_score:.1f}"
|
||||
)
|
||||
|
||||
# Add a row for the crew average
|
||||
crew_scores = [
|
||||
sum(self.tasks_scores[run]) / len(self.tasks_scores[run])
|
||||
for run in range(1, len(self.tasks_scores) + 1)
|
||||
]
|
||||
table.add_row("Crew", *map(str, crew_scores), f"{crew_average:.1f}")
|
||||
|
||||
# Display the table in the terminal
|
||||
console = Console()
|
||||
console.print(table)
|
||||
|
||||
def evaluate(self, task_output: TaskOutput):
|
||||
"""Evaluates the performance of the agents in the crew based on the tasks they have performed."""
|
||||
current_task = None
|
||||
for task in self.crew.tasks:
|
||||
if task.description == task_output.description:
|
||||
current_task = task
|
||||
break
|
||||
|
||||
if not current_task or not task_output:
|
||||
raise ValueError(
|
||||
"Task to evaluate and task output are required for evaluation"
|
||||
)
|
||||
|
||||
evaluator_agent = self._evaluator_agent()
|
||||
evaluation_task = self._evaluation_task(
|
||||
evaluator_agent, current_task, task_output.raw
|
||||
)
|
||||
|
||||
evaluation_result = evaluation_task.execute_sync()
|
||||
|
||||
if isinstance(evaluation_result.pydantic, TaskEvaluationPydanticOutput):
|
||||
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
|
||||
else:
|
||||
raise ValueError("Evaluation result is not in the expected format")
|
||||
@@ -3,7 +3,7 @@ from unittest import mock
|
||||
import pytest
|
||||
from click.testing import CliRunner
|
||||
|
||||
from crewai.cli.cli import train, version, reset_memories
|
||||
from crewai.cli.cli import reset_memories, test, train, version
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -133,3 +133,33 @@ def test_version_command_with_tools(runner):
|
||||
"crewai tools version:" in result.output
|
||||
or "crewai tools not installed" in result.output
|
||||
)
|
||||
|
||||
|
||||
@mock.patch("crewai.cli.cli.test_crew")
|
||||
def test_test_default_iterations(test_crew, runner):
|
||||
result = runner.invoke(test)
|
||||
|
||||
test_crew.assert_called_once_with(3, "gpt-4o-mini")
|
||||
assert result.exit_code == 0
|
||||
assert "Testing the crew for 3 iterations with model gpt-4o-mini" in result.output
|
||||
|
||||
|
||||
@mock.patch("crewai.cli.cli.test_crew")
|
||||
def test_test_custom_iterations(test_crew, runner):
|
||||
result = runner.invoke(test, ["--n_iterations", "5", "--model", "gpt-4o"])
|
||||
|
||||
test_crew.assert_called_once_with(5, "gpt-4o")
|
||||
assert result.exit_code == 0
|
||||
assert "Testing the crew for 5 iterations with model gpt-4o" in result.output
|
||||
|
||||
|
||||
@mock.patch("crewai.cli.cli.test_crew")
|
||||
def test_test_invalid_string_iterations(test_crew, runner):
|
||||
result = runner.invoke(test, ["--n_iterations", "invalid"])
|
||||
|
||||
test_crew.assert_not_called()
|
||||
assert result.exit_code == 2
|
||||
assert (
|
||||
"Usage: test [OPTIONS]\nTry 'test --help' for help.\n\nError: Invalid value for '-n' / '--n_iterations': 'invalid' is not a valid integer.\n"
|
||||
in result.output
|
||||
)
|
||||
|
||||
97
tests/cli/test_crew_test.py
Normal file
97
tests/cli/test_crew_test.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import subprocess
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.cli import test_crew
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"n_iterations,model",
|
||||
[
|
||||
(1, "gpt-4o"),
|
||||
(5, "gpt-3.5-turbo"),
|
||||
(10, "gpt-4"),
|
||||
],
|
||||
)
|
||||
@mock.patch("crewai.cli.test_crew.subprocess.run")
|
||||
def test_crew_success(mock_subprocess_run, n_iterations, model):
|
||||
"""Test the crew function for successful execution."""
|
||||
mock_subprocess_run.return_value = subprocess.CompletedProcess(
|
||||
args=f"poetry run test {n_iterations} {model}", returncode=0
|
||||
)
|
||||
result = test_crew.test_crew(n_iterations, model)
|
||||
|
||||
mock_subprocess_run.assert_called_once_with(
|
||||
["poetry", "run", "test", str(n_iterations), model],
|
||||
capture_output=False,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
assert result is None
|
||||
|
||||
|
||||
@mock.patch("crewai.cli.test_crew.click")
|
||||
def test_test_crew_zero_iterations(click):
|
||||
test_crew.test_crew(0, "gpt-4o")
|
||||
click.echo.assert_called_once_with(
|
||||
"An unexpected error occurred: The number of iterations must be a positive integer.",
|
||||
err=True,
|
||||
)
|
||||
|
||||
|
||||
@mock.patch("crewai.cli.test_crew.click")
|
||||
def test_test_crew_negative_iterations(click):
|
||||
test_crew.test_crew(-2, "gpt-4o")
|
||||
click.echo.assert_called_once_with(
|
||||
"An unexpected error occurred: The number of iterations must be a positive integer.",
|
||||
err=True,
|
||||
)
|
||||
|
||||
|
||||
@mock.patch("crewai.cli.test_crew.click")
|
||||
@mock.patch("crewai.cli.test_crew.subprocess.run")
|
||||
def test_test_crew_called_process_error(mock_subprocess_run, click):
|
||||
n_iterations = 5
|
||||
mock_subprocess_run.side_effect = subprocess.CalledProcessError(
|
||||
returncode=1,
|
||||
cmd=["poetry", "run", "test", str(n_iterations), "gpt-4o"],
|
||||
output="Error",
|
||||
stderr="Some error occurred",
|
||||
)
|
||||
test_crew.test_crew(n_iterations, "gpt-4o")
|
||||
|
||||
mock_subprocess_run.assert_called_once_with(
|
||||
["poetry", "run", "test", "5", "gpt-4o"],
|
||||
capture_output=False,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
click.echo.assert_has_calls(
|
||||
[
|
||||
mock.call.echo(
|
||||
"An error occurred while testing the crew: Command '['poetry', 'run', 'test', '5', 'gpt-4o']' returned non-zero exit status 1.",
|
||||
err=True,
|
||||
),
|
||||
mock.call.echo("Error", err=True),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@mock.patch("crewai.cli.test_crew.click")
|
||||
@mock.patch("crewai.cli.test_crew.subprocess.run")
|
||||
def test_test_crew_unexpected_exception(mock_subprocess_run, click):
|
||||
# Arrange
|
||||
n_iterations = 5
|
||||
mock_subprocess_run.side_effect = Exception("Unexpected error")
|
||||
test_crew.test_crew(n_iterations, "gpt-4o")
|
||||
|
||||
mock_subprocess_run.assert_called_once_with(
|
||||
["poetry", "run", "test", "5", "gpt-4o"],
|
||||
capture_output=False,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
click.echo.assert_called_once_with(
|
||||
"An unexpected error occurred: Unexpected error", err=True
|
||||
)
|
||||
@@ -8,6 +8,7 @@ from unittest.mock import MagicMock, patch
|
||||
|
||||
import pydantic_core
|
||||
import pytest
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.agents.cache import CacheHandler
|
||||
from crewai.crew import Crew
|
||||
@@ -2499,3 +2500,34 @@ def test_conditional_should_execute():
|
||||
assert condition_mock.call_count == 1
|
||||
assert condition_mock() is True
|
||||
assert mock_execute_sync.call_count == 2
|
||||
|
||||
|
||||
@mock.patch("crewai.crew.CrewEvaluator")
|
||||
@mock.patch("crewai.crew.Crew.kickoff")
|
||||
def test_crew_testing_function(mock_kickoff, crew_evaluator):
|
||||
task = Task(
|
||||
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
|
||||
expected_output="5 bullet points with a paragraph for each idea.",
|
||||
agent=researcher,
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[researcher],
|
||||
tasks=[task],
|
||||
)
|
||||
n_iterations = 2
|
||||
crew.test(n_iterations, openai_model_name="gpt-4o-mini", inputs={"topic": "AI"})
|
||||
|
||||
assert len(mock_kickoff.mock_calls) == n_iterations
|
||||
mock_kickoff.assert_has_calls(
|
||||
[mock.call(inputs={"topic": "AI"}), mock.call(inputs={"topic": "AI"})]
|
||||
)
|
||||
|
||||
crew_evaluator.assert_has_calls(
|
||||
[
|
||||
mock.call(crew, "gpt-4o-mini"),
|
||||
mock.call().set_iteration(1),
|
||||
mock.call().set_iteration(2),
|
||||
mock.call().print_crew_evaluation_result(),
|
||||
]
|
||||
)
|
||||
|
||||
@@ -1,474 +0,0 @@
|
||||
import json
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from crewai.agent import Agent
|
||||
from crewai.crew import Crew
|
||||
from crewai.crews.crew_output import CrewOutput
|
||||
from crewai.pipeline.pipeline import Pipeline
|
||||
from crewai.pipeline.pipeline_run_result import PipelineRunResult
|
||||
from crewai.process import Process
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
DEFAULT_TOKEN_USAGE = {
|
||||
"total_tokens": 100,
|
||||
"prompt_tokens": 50,
|
||||
"completion_tokens": 50,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_crew_factory():
|
||||
def _create_mock_crew(name: str, output_json_dict=None, pydantic_output=None):
|
||||
crew = MagicMock(spec=Crew)
|
||||
task_output = TaskOutput(
|
||||
description="Test task", raw="Task output", agent="Test Agent"
|
||||
)
|
||||
crew_output = CrewOutput(
|
||||
raw="Test output",
|
||||
tasks_output=[task_output],
|
||||
token_usage=DEFAULT_TOKEN_USAGE,
|
||||
json_dict=output_json_dict if output_json_dict else None,
|
||||
pydantic=pydantic_output,
|
||||
)
|
||||
|
||||
async def async_kickoff(inputs=None):
|
||||
print("inputs in async_kickoff", inputs)
|
||||
return crew_output
|
||||
|
||||
crew.kickoff_async.side_effect = async_kickoff
|
||||
|
||||
# Add more attributes that Procedure might be expecting
|
||||
crew.verbose = False
|
||||
crew.output_log_file = None
|
||||
crew.max_rpm = None
|
||||
crew.memory = False
|
||||
crew.process = Process.sequential
|
||||
crew.config = None
|
||||
crew.cache = True
|
||||
crew.name = name
|
||||
|
||||
# Add non-empty agents and tasks
|
||||
mock_agent = MagicMock(spec=Agent)
|
||||
mock_task = MagicMock(spec=Task)
|
||||
mock_task.agent = mock_agent
|
||||
mock_task.async_execution = False
|
||||
mock_task.context = None
|
||||
|
||||
crew.agents = [mock_agent]
|
||||
crew.tasks = [mock_task]
|
||||
|
||||
return crew
|
||||
|
||||
return _create_mock_crew
|
||||
|
||||
|
||||
def test_pipeline_initialization(mock_crew_factory):
|
||||
"""
|
||||
Test that a Pipeline is correctly initialized with the given stages.
|
||||
"""
|
||||
crew1 = mock_crew_factory(name="Crew 1")
|
||||
crew2 = mock_crew_factory(name="Crew 2")
|
||||
|
||||
pipeline = Pipeline(stages=[crew1, crew2])
|
||||
assert len(pipeline.stages) == 2
|
||||
assert pipeline.stages[0] == crew1
|
||||
assert pipeline.stages[1] == crew2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_with_empty_input(mock_crew_factory):
|
||||
"""
|
||||
Ensure the pipeline handles an empty input list correctly.
|
||||
"""
|
||||
crew = mock_crew_factory(name="Test Crew")
|
||||
pipeline = Pipeline(stages=[crew])
|
||||
|
||||
input_data = []
|
||||
pipeline_results = await pipeline.process_runs(input_data)
|
||||
|
||||
assert (
|
||||
len(pipeline_results) == 0
|
||||
), "Pipeline should return empty results for empty input"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_process_streams_single_input(mock_crew_factory):
|
||||
"""
|
||||
Test that Pipeline.process_streams() correctly processes a single input
|
||||
and returns the expected CrewOutput.
|
||||
"""
|
||||
crew_name = "Test Crew"
|
||||
mock_crew = mock_crew_factory(name="Test Crew")
|
||||
pipeline = Pipeline(stages=[mock_crew])
|
||||
input_data = [{"key": "value"}]
|
||||
pipeline_results = await pipeline.process_runs(input_data)
|
||||
|
||||
mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"})
|
||||
|
||||
for pipeline_result in pipeline_results:
|
||||
assert isinstance(pipeline_result, PipelineRunResult)
|
||||
assert pipeline_result.raw == "Test output"
|
||||
assert len(pipeline_result.crews_outputs) == 1
|
||||
print("pipeline_result.token_usage", pipeline_result.token_usage)
|
||||
assert pipeline_result.token_usage == {crew_name: DEFAULT_TOKEN_USAGE}
|
||||
assert pipeline_result.trace == [input_data[0], "Test Crew"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_result_ordering(mock_crew_factory):
|
||||
"""
|
||||
Ensure that results are returned in the same order as the inputs, especially with parallel processing.
|
||||
"""
|
||||
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"output": "crew1"})
|
||||
crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"output": "crew2"})
|
||||
crew3 = mock_crew_factory(name="Crew 3", output_json_dict={"output": "crew3"})
|
||||
|
||||
pipeline = Pipeline(
|
||||
stages=[crew1, [crew2, crew3]]
|
||||
) # Parallel stage to test ordering
|
||||
|
||||
input_data = [{"id": 1}, {"id": 2}, {"id": 3}]
|
||||
pipeline_results = await pipeline.process_runs(input_data)
|
||||
|
||||
assert (
|
||||
len(pipeline_results) == 6
|
||||
), "Should have 2 results for each input due to the parallel final stage"
|
||||
|
||||
# Group results by their original input id
|
||||
grouped_results = {}
|
||||
for result in pipeline_results:
|
||||
input_id = result.trace[0]["id"]
|
||||
if input_id not in grouped_results:
|
||||
grouped_results[input_id] = []
|
||||
grouped_results[input_id].append(result)
|
||||
|
||||
# Check that we have the correct number of groups and results per group
|
||||
assert len(grouped_results) == 3, "Should have results for each of the 3 inputs"
|
||||
for input_id, results in grouped_results.items():
|
||||
assert (
|
||||
len(results) == 2
|
||||
), f"Each input should have 2 results, but input {input_id} has {len(results)}"
|
||||
|
||||
# Check the ordering and content of the results
|
||||
for input_id in range(1, 4):
|
||||
group = grouped_results[input_id]
|
||||
assert group[0].trace == [
|
||||
{"id": input_id},
|
||||
"Crew 1",
|
||||
"Crew 2",
|
||||
], f"Unexpected trace for first result of input {input_id}"
|
||||
assert group[1].trace == [
|
||||
{"id": input_id},
|
||||
"Crew 1",
|
||||
"Crew 3",
|
||||
], f"Unexpected trace for second result of input {input_id}"
|
||||
assert (
|
||||
group[0].json_dict["output"] == "crew2"
|
||||
), f"Unexpected output for first result of input {input_id}"
|
||||
assert (
|
||||
group[1].json_dict["output"] == "crew3"
|
||||
), f"Unexpected output for second result of input {input_id}"
|
||||
|
||||
|
||||
class TestPydanticOutput(BaseModel):
|
||||
key: str
|
||||
value: int
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_process_streams_single_input_pydantic_output(mock_crew_factory):
|
||||
crew_name = "Test Crew"
|
||||
mock_crew = mock_crew_factory(
|
||||
name=crew_name,
|
||||
output_json_dict=None,
|
||||
pydantic_output=TestPydanticOutput(key="test", value=42),
|
||||
)
|
||||
pipeline = Pipeline(stages=[mock_crew])
|
||||
input_data = [{"key": "value"}]
|
||||
pipeline_results = await pipeline.process_runs(input_data)
|
||||
|
||||
assert len(pipeline_results) == 1
|
||||
pipeline_result = pipeline_results[0]
|
||||
|
||||
print("pipeline_result.trace", pipeline_result.trace)
|
||||
|
||||
assert isinstance(pipeline_result, PipelineRunResult)
|
||||
assert pipeline_result.raw == "Test output"
|
||||
assert len(pipeline_result.crews_outputs) == 1
|
||||
assert pipeline_result.token_usage == {crew_name: DEFAULT_TOKEN_USAGE}
|
||||
print("INPUT DATA POST PROCESS", input_data)
|
||||
assert pipeline_result.trace == [input_data[0], "Test Crew"]
|
||||
|
||||
assert isinstance(pipeline_result.pydantic, TestPydanticOutput)
|
||||
assert pipeline_result.pydantic.key == "test"
|
||||
assert pipeline_result.pydantic.value == 42
|
||||
assert pipeline_result.json_dict is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_preserves_original_input(mock_crew_factory):
|
||||
crew_name = "Test Crew"
|
||||
mock_crew = mock_crew_factory(
|
||||
name=crew_name,
|
||||
output_json_dict={"new_key": "new_value"},
|
||||
)
|
||||
pipeline = Pipeline(stages=[mock_crew])
|
||||
|
||||
# Create a deep copy of the input data to ensure we're not comparing references
|
||||
original_input_data = [{"key": "value", "nested": {"a": 1}}]
|
||||
input_data = json.loads(json.dumps(original_input_data))
|
||||
|
||||
await pipeline.process_runs(input_data)
|
||||
|
||||
# Assert that the original input hasn't been modified
|
||||
assert (
|
||||
input_data == original_input_data
|
||||
), "The original input data should not be modified"
|
||||
|
||||
# Ensure that even nested structures haven't been modified
|
||||
assert (
|
||||
input_data[0]["nested"] == original_input_data[0]["nested"]
|
||||
), "Nested structures should not be modified"
|
||||
|
||||
# Verify that adding new keys to the crew output doesn't affect the original input
|
||||
assert (
|
||||
"new_key" not in input_data[0]
|
||||
), "New keys from crew output should not be added to the original input"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_process_streams_multiple_inputs(mock_crew_factory):
|
||||
"""
|
||||
Test that Pipeline.process_streams() correctly processes multiple inputs
|
||||
and returns the expected CrewOutputs.
|
||||
"""
|
||||
mock_crew = mock_crew_factory(name="Test Crew")
|
||||
pipeline = Pipeline(stages=[mock_crew])
|
||||
input_data = [{"key1": "value1"}, {"key2": "value2"}]
|
||||
pipeline_results = await pipeline.process_runs(input_data)
|
||||
|
||||
assert mock_crew.kickoff_async.call_count == 2
|
||||
assert len(pipeline_results) == 2
|
||||
for pipeline_result in pipeline_results:
|
||||
print("pipeline_result,", pipeline_result)
|
||||
assert all(
|
||||
isinstance(crew_output, CrewOutput)
|
||||
for crew_output in pipeline_result.crews_outputs
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_with_parallel_stages(mock_crew_factory):
|
||||
"""
|
||||
Test that Pipeline correctly handles parallel stages.
|
||||
"""
|
||||
crew1 = mock_crew_factory(name="Crew 1")
|
||||
crew2 = mock_crew_factory(name="Crew 2")
|
||||
crew3 = mock_crew_factory(name="Crew 3")
|
||||
|
||||
pipeline = Pipeline(stages=[crew1, [crew2, crew3]])
|
||||
input_data = [{"initial": "data"}]
|
||||
|
||||
pipeline_result = await pipeline.process_runs(input_data)
|
||||
|
||||
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
||||
|
||||
assert len(pipeline_result) == 2
|
||||
pipeline_result_1, pipeline_result_2 = pipeline_result
|
||||
|
||||
pipeline_result_1.trace = [
|
||||
"Crew 1",
|
||||
"Crew 2",
|
||||
]
|
||||
pipeline_result_2.trace = [
|
||||
"Crew 1",
|
||||
"Crew 3",
|
||||
]
|
||||
|
||||
expected_token_usage = {
|
||||
"Crew 1": DEFAULT_TOKEN_USAGE,
|
||||
"Crew 2": DEFAULT_TOKEN_USAGE,
|
||||
"Crew 3": DEFAULT_TOKEN_USAGE,
|
||||
}
|
||||
|
||||
assert pipeline_result_1.token_usage == expected_token_usage
|
||||
assert pipeline_result_2.token_usage == expected_token_usage
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_with_parallel_stages_end_in_single_stage(mock_crew_factory):
|
||||
"""
|
||||
Test that Pipeline correctly handles parallel stages.
|
||||
"""
|
||||
crew1 = mock_crew_factory(name="Crew 1")
|
||||
crew2 = mock_crew_factory(name="Crew 2")
|
||||
crew3 = mock_crew_factory(name="Crew 3")
|
||||
crew4 = mock_crew_factory(name="Crew 4")
|
||||
|
||||
pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4])
|
||||
input_data = [{"initial": "data"}]
|
||||
|
||||
pipeline_result = await pipeline.process_runs(input_data)
|
||||
|
||||
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
||||
|
||||
assert len(pipeline_result) == 1
|
||||
pipeline_result_1 = pipeline_result[0]
|
||||
|
||||
pipeline_result_1.trace = [
|
||||
input_data[0],
|
||||
"Crew 1",
|
||||
["Crew 2", "Crew 3"],
|
||||
"Crew 4",
|
||||
]
|
||||
|
||||
expected_token_usage = {
|
||||
"Crew 1": DEFAULT_TOKEN_USAGE,
|
||||
"Crew 2": DEFAULT_TOKEN_USAGE,
|
||||
"Crew 3": DEFAULT_TOKEN_USAGE,
|
||||
"Crew 4": DEFAULT_TOKEN_USAGE,
|
||||
}
|
||||
|
||||
assert pipeline_result_1.token_usage == expected_token_usage
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_with_parallel_stages_multiple_inputs(mock_crew_factory):
|
||||
# TODO: implement
|
||||
pass
|
||||
|
||||
|
||||
def test_pipeline_rshift_operator(mock_crew_factory):
|
||||
"""
|
||||
Test that the >> operator correctly creates a Pipeline from Crews and lists of Crews.
|
||||
"""
|
||||
crew1 = mock_crew_factory(name="Crew 1")
|
||||
crew2 = mock_crew_factory(name="Crew 2")
|
||||
crew3 = mock_crew_factory(name="Crew 3")
|
||||
|
||||
# Test single crew addition
|
||||
pipeline = Pipeline(stages=[]) >> crew1
|
||||
assert len(pipeline.stages) == 1
|
||||
assert pipeline.stages[0] == crew1
|
||||
|
||||
# Test adding a list of crews
|
||||
pipeline = Pipeline(stages=[crew1])
|
||||
pipeline = pipeline >> [crew2, crew3]
|
||||
print("pipeline.stages:", pipeline.stages)
|
||||
assert len(pipeline.stages) == 2
|
||||
assert pipeline.stages[1] == [crew2, crew3]
|
||||
|
||||
# Test error case: trying to shift with non-Crew object
|
||||
with pytest.raises(TypeError):
|
||||
pipeline >> "not a crew"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_parallel_crews_to_parallel_crews(mock_crew_factory):
|
||||
"""
|
||||
Test that feeding parallel crews to parallel crews works correctly.
|
||||
"""
|
||||
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"output1": "crew1"})
|
||||
crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"output2": "crew2"})
|
||||
crew3 = mock_crew_factory(name="Crew 3", output_json_dict={"output3": "crew3"})
|
||||
crew4 = mock_crew_factory(name="Crew 4", output_json_dict={"output4": "crew4"})
|
||||
|
||||
pipeline = Pipeline(stages=[[crew1, crew2], [crew3, crew4]])
|
||||
|
||||
input_data = [{"input": "test"}]
|
||||
pipeline_results = await pipeline.process_runs(input_data)
|
||||
|
||||
assert len(pipeline_results) == 2, "Should have 2 results for final parallel stage"
|
||||
|
||||
pipeline_result_1, pipeline_result_2 = pipeline_results
|
||||
|
||||
# Check the outputs
|
||||
assert pipeline_result_1.json_dict == {"output3": "crew3"}
|
||||
assert pipeline_result_2.json_dict == {"output4": "crew4"}
|
||||
|
||||
# Check the traces
|
||||
expected_traces = [
|
||||
[{"input": "test"}, ["Crew 1", "Crew 2"], "Crew 3"],
|
||||
[{"input": "test"}, ["Crew 1", "Crew 2"], "Crew 4"],
|
||||
]
|
||||
|
||||
for result, expected_trace in zip(pipeline_results, expected_traces):
|
||||
assert result.trace == expected_trace, f"Unexpected trace: {result.trace}"
|
||||
|
||||
|
||||
def test_pipeline_double_nesting_not_allowed(mock_crew_factory):
|
||||
"""
|
||||
Test that double nesting in pipeline stages is not allowed.
|
||||
"""
|
||||
crew1 = mock_crew_factory(name="Crew 1")
|
||||
crew2 = mock_crew_factory(name="Crew 2")
|
||||
crew3 = mock_crew_factory(name="Crew 3")
|
||||
crew4 = mock_crew_factory(name="Crew 4")
|
||||
|
||||
with pytest.raises(ValidationError) as exc_info:
|
||||
Pipeline(stages=[crew1, [[crew2, crew3], crew4]])
|
||||
|
||||
error_msg = str(exc_info.value)
|
||||
print(f"Full error message: {error_msg}") # For debugging
|
||||
assert (
|
||||
"Double nesting is not allowed in pipeline stages" in error_msg
|
||||
), f"Unexpected error message: {error_msg}"
|
||||
|
||||
|
||||
def test_pipeline_invalid_crew(mock_crew_factory):
|
||||
"""
|
||||
Test that non-Crew objects are not allowed in pipeline stages.
|
||||
"""
|
||||
crew1 = mock_crew_factory(name="Crew 1")
|
||||
not_a_crew = "This is not a crew"
|
||||
|
||||
with pytest.raises(ValidationError) as exc_info:
|
||||
Pipeline(stages=[crew1, not_a_crew])
|
||||
|
||||
error_msg = str(exc_info.value)
|
||||
print(f"Full error message: {error_msg}") # For debugging
|
||||
assert (
|
||||
"Expected Crew instance or list of Crews, got <class 'str'>" in error_msg
|
||||
), f"Unexpected error message: {error_msg}"
|
||||
|
||||
|
||||
"""
|
||||
TODO: Figure out what is the proper output for a pipeline with multiple stages
|
||||
|
||||
Options:
|
||||
- Should the final output only include the last stage's output?
|
||||
- Should the final output include the accumulation of previous stages' outputs?
|
||||
"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_data_accumulation(mock_crew_factory):
|
||||
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"})
|
||||
crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"key2": "value2"})
|
||||
|
||||
pipeline = Pipeline(stages=[crew1, crew2])
|
||||
input_data = [{"initial": "data"}]
|
||||
results = await pipeline.process_runs(input_data)
|
||||
|
||||
# Check that crew1 was called with only the initial input
|
||||
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
||||
|
||||
# Check that crew2 was called with the combined input from the initial data and crew1's output
|
||||
crew2.kickoff_async.assert_called_once_with(
|
||||
inputs={"initial": "data", "key1": "value1"}
|
||||
)
|
||||
|
||||
# Check the final output
|
||||
assert len(results) == 1
|
||||
final_result = results[0]
|
||||
assert final_result.json_dict == {"key2": "value2"}
|
||||
|
||||
# Check that the trace includes all stages
|
||||
assert final_result.trace == [{"initial": "data"}, "Crew 1", "Crew 2"]
|
||||
|
||||
# Check that crews_outputs contain the correct information
|
||||
assert len(final_result.crews_outputs) == 2
|
||||
assert final_result.crews_outputs[0].json_dict == {"key1": "value1"}
|
||||
assert final_result.crews_outputs[1].json_dict == {"key2": "value2"}
|
||||
113
tests/utilities/evaluators/test_crew_evaluator_handler.py
Normal file
113
tests/utilities/evaluators/test_crew_evaluator_handler.py
Normal file
@@ -0,0 +1,113 @@
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.crew import Crew
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.utilities.evaluators.crew_evaluator_handler import (
|
||||
CrewEvaluator,
|
||||
TaskEvaluationPydanticOutput,
|
||||
)
|
||||
|
||||
|
||||
class TestCrewEvaluator:
|
||||
@pytest.fixture
|
||||
def crew_planner(self):
|
||||
agent = Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1")
|
||||
task = Task(
|
||||
description="Task 1",
|
||||
expected_output="Output 1",
|
||||
agent=agent,
|
||||
)
|
||||
crew = Crew(agents=[agent], tasks=[task])
|
||||
|
||||
return CrewEvaluator(crew, openai_model_name="gpt-4o-mini")
|
||||
|
||||
def test_setup_for_evaluating(self, crew_planner):
|
||||
crew_planner._setup_for_evaluating()
|
||||
assert crew_planner.crew.tasks[0].callback == crew_planner.evaluate
|
||||
|
||||
def test_set_iteration(self, crew_planner):
|
||||
crew_planner.set_iteration(1)
|
||||
assert crew_planner.iteration == 1
|
||||
|
||||
def test_evaluator_agent(self, crew_planner):
|
||||
agent = crew_planner._evaluator_agent()
|
||||
assert agent.role == "Task Execution Evaluator"
|
||||
assert (
|
||||
agent.goal
|
||||
== "Your goal is to evaluate the performance of the agents in the crew based on the tasks they have performed using score from 1 to 10 evaluating on completion, quality, and overall performance."
|
||||
)
|
||||
assert (
|
||||
agent.backstory
|
||||
== "Evaluator agent for crew evaluation with precise capabilities to evaluate the performance of the agents in the crew based on the tasks they have performed"
|
||||
)
|
||||
assert agent.verbose is False
|
||||
assert agent.llm.model_name == "gpt-4o-mini"
|
||||
|
||||
def test_evaluation_task(self, crew_planner):
|
||||
evaluator_agent = Agent(
|
||||
role="Evaluator Agent",
|
||||
goal="Evaluate the performance of the agents in the crew",
|
||||
backstory="Master in Evaluation",
|
||||
)
|
||||
task_to_evaluate = Task(
|
||||
description="Task 1",
|
||||
expected_output="Output 1",
|
||||
agent=Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1"),
|
||||
)
|
||||
task_output = "Task Output 1"
|
||||
task = crew_planner._evaluation_task(
|
||||
evaluator_agent, task_to_evaluate, task_output
|
||||
)
|
||||
|
||||
assert task.description.startswith(
|
||||
"Based on the task description and the expected output, compare and evaluate the performance of the agents in the crew based on the Task Output they have performed using score from 1 to 10 evaluating on completion, quality, and overall performance."
|
||||
)
|
||||
|
||||
assert task.agent == evaluator_agent
|
||||
assert (
|
||||
task.description
|
||||
== "Based on the task description and the expected output, compare and evaluate "
|
||||
"the performance of the agents in the crew based on the Task Output they have "
|
||||
"performed using score from 1 to 10 evaluating on completion, quality, and overall "
|
||||
"performance.task_description: Task 1 task_expected_output: Output 1 "
|
||||
"agent: Agent 1 agent_goal: Goal 1 Task Output: Task Output 1"
|
||||
)
|
||||
|
||||
@mock.patch("crewai.utilities.evaluators.crew_evaluator_handler.Console")
|
||||
@mock.patch("crewai.utilities.evaluators.crew_evaluator_handler.Table")
|
||||
def test_print_crew_evaluation_result(self, table, console, crew_planner):
|
||||
crew_planner.tasks_scores = {
|
||||
1: [10, 9, 8],
|
||||
2: [9, 8, 7],
|
||||
}
|
||||
|
||||
crew_planner.print_crew_evaluation_result()
|
||||
|
||||
table.assert_has_calls(
|
||||
[
|
||||
mock.call(title="Tasks Scores \n (1-10 Higher is better)"),
|
||||
mock.call().add_column("Tasks/Crew"),
|
||||
mock.call().add_column("Run 1"),
|
||||
mock.call().add_column("Run 2"),
|
||||
mock.call().add_column("Avg. Total"),
|
||||
mock.call().add_row("Task 1", "10", "9", "9.5"),
|
||||
mock.call().add_row("Task 2", "9", "8", "8.5"),
|
||||
mock.call().add_row("Task 3", "8", "7", "7.5"),
|
||||
mock.call().add_row("Crew", "9.0", "8.0", "8.5"),
|
||||
]
|
||||
)
|
||||
console.assert_has_calls([mock.call(), mock.call().print(table())])
|
||||
|
||||
def test_evaluate(self, crew_planner):
|
||||
task_output = TaskOutput(
|
||||
description="Task 1", agent=str(crew_planner.crew.agents[0])
|
||||
)
|
||||
|
||||
with mock.patch.object(Task, "execute_sync") as execute:
|
||||
execute().pydantic = TaskEvaluationPydanticOutput(quality=9.5)
|
||||
crew_planner.evaluate(task_output)
|
||||
assert crew_planner.tasks_scores[0] == [9.5]
|
||||
Reference in New Issue
Block a user