Compare commits

..

26 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
109fc3686c Merge branch 'main' into brandon/cre-509-hitl-multiple-rounds-of-followup 2024-12-05 10:03:56 -05:00
Brandon Hancock
d366264d42 Drop extra print 2024-12-05 10:03:26 -05:00
Tony Kipkemboi
fa373f9660 add knowledge demo + improve knowledge docs (#1706) 2024-12-05 09:49:44 -05:00
Rashmi Pawar
48bb8ef775 docs: add nvidia as provider (#1632)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2024-12-04 15:38:46 -05:00
Brandon Hancock
98253b0f04 Fix test again 2024-12-04 15:30:06 -05:00
Brandon Hancock
a73ef3a3ee fix tests 2024-12-04 15:01:40 -05:00
Brandon Hancock
ace3394dd5 Fix type issue 2024-12-04 14:31:15 -05:00
Brandon Hancock
aa8610dd79 refactor and more clear messages 2024-12-04 14:17:44 -05:00
Brandon Hancock
78d49657ff HITL code more robust. Still needs to be refactored. 2024-12-04 13:50:59 -05:00
Brandon Hancock
cb45b971d7 Drop print statements 2024-12-04 13:05:17 -05:00
Brandon Hancock
1c7700e5e2 Merge branch 'main' into brandon/cre-509-hitl-multiple-rounds-of-followup 2024-12-04 13:02:50 -05:00
Brandon Hancock (bhancock_ai)
bbea797b0c remove all references to pipeline and pipeline router (#1661)
* remove all references to pipeline and router

* fix linting

* drop poetry.lock
2024-12-04 12:39:34 -05:00
Tony Kipkemboi
066ad73423 Merge pull request #1698 from crewAIInc/brandon/cre-510-update-docs-to-talk-about-pydantic-and-json-outputs
Talk about getting structured consistent outputs with tasks.
2024-12-04 11:07:52 -05:00
Tony Kipkemboi
0695c26703 Merge branch 'main' into brandon/cre-510-update-docs-to-talk-about-pydantic-and-json-outputs 2024-12-04 11:05:47 -05:00
Brandon Hancock
4fb3331c6a Talk about getting structured consistent outputs with tasks. 2024-12-04 10:46:39 -05:00
Brandon Hancock
00197b9690 v1 of HITL working 2024-12-03 17:05:28 -05:00
Stephen
b6c6eea6f5 Update README.md (#1694)
Corrected the statement which says users can not disable telemetry, but now users can disable by setting the environment variable OTEL_SDK_DISABLED to true.

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2024-12-03 16:08:19 -05:00
Lorenze Jay
1af95f5146 Knowledge project directory standard (#1691)
* Knowledge project directory standard

* fixed types

* comment fix

* made base file knowledge source an abstract class

* cleaner validator on model_post_init

* fix type checker

* cleaner refactor

* better template
2024-12-03 12:27:48 -08:00
Feynman Liang
ed3487aa22 Fix indentation in llm-connections.mdx code block (#1573)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2024-12-03 12:52:23 -05:00
Patcher
77af733e44 [Doc]: Add documenation for openlit observability (#1612)
* Create openlit-observability.mdx

* Update doc with images and steps

* Update mkdocs.yml and add OpenLIT guide link

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2024-12-03 12:38:49 -05:00
Tom Mahler, PhD
aaf80d1d43 [FEATURE] Support for custom path in RAGStorage (#1659)
* added path to RAGStorage

* added path to short term and entity memory

* add path for long_term_storage for completeness

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2024-12-03 12:22:29 -05:00
Ola Hungerford
9e9b945a46 Update using langchain tools docs (#1664)
* Update example of how to use LangChain tools with correct syntax

* Use .env

* Add  Code back

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2024-12-03 11:13:06 -05:00
Javier Saldaña
308a8dc925 Update reset memories command based on the SDK (#1688)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2024-12-03 10:09:30 -05:00
Tony Kipkemboi
7d9d0ff6f7 fix missing code in flows docs (#1690)
* docs: improve tasks documentation clarity and structure

- Add Task Execution Flow section
- Add variable interpolation explanation
- Add Task Dependencies section with examples
- Improve overall document structure and readability
- Update code examples with proper syntax highlighting

* docs: update agent documentation with improved examples and formatting

- Replace DuckDuckGoSearchRun with SerperDevTool
- Update code block formatting to be consistent
- Improve template examples with actual syntax
- Update LLM examples to use current models
- Clean up formatting and remove redundant comments

* docs: enhance LLM documentation with Cerebras provider and formatting improvements

* docs: simplify LLMs documentation title

* docs: improve installation guide clarity and structure

- Add clear Python version requirements with check command
- Simplify installation options to recommended method
- Improve upgrade section clarity for existing users
- Add better visual structure with Notes and Tips
- Update description and formatting

* docs: improve introduction page organization and clarity

- Update organizational analogy in Note section
- Improve table formatting and alignment
- Remove emojis from component table for cleaner look
- Add 'helps you' to make the note more action-oriented

* docs: add enterprise and community cards

- Add Enterprise deployment card in quickstart
- Add community card focused on open source discussions
- Remove deployment reference from community description
- Clean up introduction page cards
- Remove link from Enterprise description text

* docs: add code snippet to Getting Started section in flows.mdx

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2024-12-03 10:02:06 -05:00
João Moura
f8a8e7b2a5 preparing new version 2024-12-02 18:28:58 -03:00
Brandon Hancock (bhancock_ai)
3285c1b196 Fixes issues with result as answer not properly exiting LLM loop (#1689)
* v1 of fix implemented. Need to confirm with tokens.

* remove print statements
2024-12-02 13:38:17 -05:00
93 changed files with 1229 additions and 10586 deletions

View File

@@ -376,7 +376,7 @@ pip install dist/*.tar.gz
CrewAI uses anonymous telemetry to collect usage data with the main purpose of helping us improve the library by focusing our efforts on the most used features, integrations and tools.
It's pivotal to understand that **NO data is collected** concerning prompts, task descriptions, agents' backstories or goals, usage of tools, API calls, responses, any data processed by the agents, or secrets and environment variables, with the exception of the conditions mentioned. When the `share_crew` feature is enabled, detailed data including task descriptions, agents' backstories or goals, and other specific attributes are collected to provide deeper insights while respecting user privacy. We don't offer a way to disable it now, but we will in the future.
It's pivotal to understand that **NO data is collected** concerning prompts, task descriptions, agents' backstories or goals, usage of tools, API calls, responses, any data processed by the agents, or secrets and environment variables, with the exception of the conditions mentioned. When the `share_crew` feature is enabled, detailed data including task descriptions, agents' backstories or goals, and other specific attributes are collected to provide deeper insights while respecting user privacy. Users can disable telemetry by setting the environment variable OTEL_SDK_DISABLED to true.
Data collected includes:

View File

@@ -28,20 +28,19 @@ crewai [COMMAND] [OPTIONS] [ARGUMENTS]
### 1. Create
Create a new crew or pipeline.
Create a new crew or flow.
```shell
crewai create [OPTIONS] TYPE NAME
```
- `TYPE`: Choose between "crew" or "pipeline"
- `NAME`: Name of the crew or pipeline
- `--router`: (Optional) Create a pipeline with router functionality
- `TYPE`: Choose between "crew" or "flow"
- `NAME`: Name of the crew or flow
Example:
```shell
crewai create crew my_new_crew
crewai create pipeline my_new_pipeline --router
crewai create flow my_new_flow
```
### 2. Version

View File

@@ -18,63 +18,60 @@ Flows allow you to create structured, event-driven workflows. They provide a sea
4. **Flexible Control Flow**: Implement conditional logic, loops, and branching within your workflows.
5. **Input Flexibility**: Flows can accept inputs to initialize or update their state, with different handling for structured and unstructured state management.
## Getting Started
Let's create a simple Flow where you will use OpenAI to generate a random city in one task and then use that city to generate a fun fact in another task.
### Passing Inputs to Flows
```python Code
Flows can accept inputs to initialize or update their state before execution. The way inputs are handled depends on whether the flow uses structured or unstructured state management.
#### Structured State Management
In structured state management, the flow's state is defined using a Pydantic `BaseModel`. Inputs must match the model's schema, and any updates will overwrite the default values.
```python
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
from dotenv import load_dotenv
from litellm import completion
class ExampleState(BaseModel):
counter: int = 0
message: str = ""
class StructuredExampleFlow(Flow[ExampleState]):
class ExampleFlow(Flow):
model = "gpt-4o-mini"
@start()
def first_method(self):
# Implementation
def generate_city(self):
print("Starting flow")
flow = StructuredExampleFlow()
flow.kickoff(inputs={"counter": 10})
```
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": "Return the name of a random city in the world.",
},
],
)
In this example, the `counter` is initialized to `10`, while `message` retains its default value.
random_city = response["choices"][0]["message"]["content"]
print(f"Random City: {random_city}")
#### Unstructured State Management
return random_city
In unstructured state management, the flow's state is a dictionary. You can pass any dictionary to update the state.
@listen(generate_city)
def generate_fun_fact(self, random_city):
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": f"Tell me a fun fact about {random_city}",
},
],
)
```python
from crewai.flow.flow import Flow, listen, start
fun_fact = response["choices"][0]["message"]["content"]
return fun_fact
class UnstructuredExampleFlow(Flow):
@start()
def first_method(self):
# Implementation
flow = UnstructuredExampleFlow()
flow.kickoff(inputs={"counter": 5, "message": "Initial message"})
```
Here, both `counter` and `message` are updated based on the provided inputs.
flow = ExampleFlow()
result = flow.kickoff()
**Note:** Ensure that inputs for structured state management adhere to the defined schema to avoid validation errors.
### Example Flow
```python
# Existing example code
print(f"Generated fun fact: {result}")
```
In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.
@@ -97,14 +94,14 @@ The `@listen()` decorator can be used in several ways:
1. **Listening to a Method by Name**: You can pass the name of the method you want to listen to as a string. When that method completes, the listener method will be triggered.
```python
```python Code
@listen("generate_city")
def generate_fun_fact(self, random_city):
# Implementation
```
2. **Listening to a Method Directly**: You can pass the method itself. When that method completes, the listener method will be triggered.
```python
```python Code
@listen(generate_city)
def generate_fun_fact(self, random_city):
# Implementation
@@ -121,7 +118,7 @@ When you run a Flow, the final output is determined by the last method that comp
Here's how you can access the final output:
<CodeGroup>
```python
```python Code
from crewai.flow.flow import Flow, listen, start
class OutputExampleFlow(Flow):
@@ -133,17 +130,18 @@ class OutputExampleFlow(Flow):
def second_method(self, first_output):
return f"Second method received: {first_output}"
flow = OutputExampleFlow()
final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
```
````
```text
``` text Output
---- Final Output ----
Second method received: Output from first_method
```
````
</CodeGroup>
@@ -158,7 +156,7 @@ Here's an example of how to update and access the state:
<CodeGroup>
```python
```python Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
@@ -186,7 +184,7 @@ print("Final State:")
print(flow.state)
```
```text
```text Output
Final Output: Hello from first_method - updated by second_method
Final State:
counter=2 message='Hello from first_method - updated by second_method'
@@ -210,10 +208,10 @@ allowing developers to choose the approach that best fits their application's ne
In unstructured state management, all state is stored in the `state` attribute of the `Flow` class.
This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
```python
```python Code
from crewai.flow.flow import Flow, listen, start
class UnstructuredExampleFlow(Flow):
class UntructuredExampleFlow(Flow):
@start()
def first_method(self):
@@ -232,7 +230,8 @@ class UnstructuredExampleFlow(Flow):
print(f"State after third_method: {self.state}")
flow = UnstructuredExampleFlow()
flow = UntructuredExampleFlow()
flow.kickoff()
```
@@ -246,14 +245,16 @@ flow.kickoff()
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow.
By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.
```python
```python Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
counter: int = 0
message: str = ""
class StructuredExampleFlow(Flow[ExampleState]):
@start()
@@ -272,6 +273,7 @@ class StructuredExampleFlow(Flow[ExampleState]):
print(f"State after third_method: {self.state}")
flow = StructuredExampleFlow()
flow.kickoff()
```
@@ -305,7 +307,7 @@ The `or_` function in Flows allows you to listen to multiple methods and trigger
<CodeGroup>
```python
```python Code
from crewai.flow.flow import Flow, listen, or_, start
class OrExampleFlow(Flow):
@@ -322,11 +324,13 @@ class OrExampleFlow(Flow):
def logger(self, result):
print(f"Logger: {result}")
flow = OrExampleFlow()
flow.kickoff()
```
```text
```text Output
Logger: Hello from the start method
Logger: Hello from the second method
```
@@ -342,7 +346,7 @@ The `and_` function in Flows allows you to listen to multiple methods and trigge
<CodeGroup>
```python
```python Code
from crewai.flow.flow import Flow, and_, listen, start
class AndExampleFlow(Flow):
@@ -364,7 +368,7 @@ flow = AndExampleFlow()
flow.kickoff()
```
```text
```text Output
---- Logger ----
{'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}
```
@@ -381,7 +385,7 @@ You can specify different routes based on the output of the method, allowing you
<CodeGroup>
```python
```python Code
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
@@ -412,11 +416,12 @@ class RouterFlow(Flow[ExampleState]):
def fourth_method(self):
print("Fourth method running")
flow = RouterFlow()
flow.kickoff()
```
```text
```text Output
Starting the structured flow
Third method running
Fourth method running
@@ -479,7 +484,7 @@ The `main.py` file is where you create your flow and connect the crews together.
Here's an example of how you can connect the `poem_crew` in the `main.py` file:
```python
```python Code
#!/usr/bin/env python
from random import randint
@@ -555,42 +560,6 @@ uv run kickoff
The flow will execute, and you should see the output in the console.
### Adding Additional Crews Using the CLI
Once you have created your initial flow, you can easily add additional crews to your project using the CLI. This allows you to expand your flow's capabilities by integrating new crews without starting from scratch.
To add a new crew to your existing flow, use the following command:
```bash
crewai flow add-crew <crew_name>
```
This command will create a new directory for your crew within the `crews` folder of your flow project. It will include the necessary configuration files and a crew definition file, similar to the initial setup.
#### Folder Structure
After adding a new crew, your folder structure will look like this:
| Directory/File | Description |
| :--------------------- | :----------------------------------------------------------------- |
| `name_of_flow/` | Root directory for the flow. |
| ├── `crews/` | Contains directories for specific crews. |
| │ ├── `poem_crew/` | Directory for the "poem_crew" with its configurations and scripts. |
| │ │ ├── `config/` | Configuration files directory for the "poem_crew". |
| │ │ │ ├── `agents.yaml` | YAML file defining the agents for "poem_crew". |
| │ │ │ └── `tasks.yaml` | YAML file defining the tasks for "poem_crew". |
| │ │ └── `poem_crew.py` | Script for "poem_crew" functionality. |
| └── `name_of_crew/` | Directory for the new crew. |
| ├── `config/` | Configuration files directory for the new crew. |
| │ ├── `agents.yaml` | YAML file defining the agents for the new crew. |
| │ └── `tasks.yaml` | YAML file defining the tasks for the new crew. |
| └── `name_of_crew.py` | Script for the new crew functionality. |
You can then customize the `agents.yaml` and `tasks.yaml` files to define the agents and tasks for your new crew. The `name_of_crew.py` file will contain the crew's logic, which you can modify to suit your needs.
By using the CLI to add additional crews, you can efficiently build complex AI workflows that leverage multiple crews working together.
## Plot Flows
Visualizing your AI workflows can provide valuable insights into the structure and execution paths of your flows. CrewAI offers a powerful visualization tool that allows you to generate interactive plots of your flows, making it easier to understand and optimize your AI workflows.
@@ -607,7 +576,7 @@ CrewAI provides two convenient methods to generate plots of your flows:
If you are working directly with a flow instance, you can generate a plot by calling the `plot()` method on your flow object. This method will create an HTML file containing the interactive plot of your flow.
```python
```python Code
# Assuming you have a flow instance
flow.plot("my_flow_plot")
```
@@ -630,114 +599,13 @@ The generated plot will display nodes representing the tasks in your flow, with
By visualizing your flows, you can gain a clearer understanding of the workflow's structure, making it easier to debug, optimize, and communicate your AI processes to others.
### Conclusion
## Advanced
In this section, we explore more complex use cases of CrewAI Flows, starting with a self-evaluation loop. This pattern is crucial for developing AI systems that can iteratively improve their outputs through feedback.
### 1) Self-Evaluation Loop
The self-evaluation loop is a powerful pattern that allows AI workflows to automatically assess and refine their outputs. This example demonstrates how to set up a flow that generates content, evaluates it, and iterates based on feedback until the desired quality is achieved.
#### Overview
The self-evaluation loop involves two main Crews:
1. **ShakespeareanXPostCrew**: Generates a Shakespearean-style post on a given topic.
2. **XPostReviewCrew**: Evaluates the generated post, providing feedback on its validity and quality.
The process iterates until the post meets the criteria or a maximum retry limit is reached. This approach ensures high-quality outputs through iterative refinement.
#### Importance
This pattern is essential for building robust AI systems that can adapt and improve over time. By automating the evaluation and feedback loop, developers can ensure that their AI workflows produce reliable and high-quality results.
#### Main Code Highlights
Below is the `main.py` file for the self-evaluation loop flow:
```python
from typing import Optional
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
from self_evaluation_loop_flow.crews.shakespeare_crew.shakespeare_crew import (
ShakespeareanXPostCrew,
)
from self_evaluation_loop_flow.crews.x_post_review_crew.x_post_review_crew import (
XPostReviewCrew,
)
class ShakespeareXPostFlowState(BaseModel):
x_post: str = ""
feedback: Optional[str] = None
valid: bool = False
retry_count: int = 0
class ShakespeareXPostFlow(Flow[ShakespeareXPostFlowState]):
@start("retry")
def generate_shakespeare_x_post(self):
print("Generating Shakespearean X post")
topic = "Flying cars"
result = (
ShakespeareanXPostCrew()
.crew()
.kickoff(inputs={"topic": topic, "feedback": self.state.feedback})
)
print("X post generated", result.raw)
self.state.x_post = result.raw
@router(generate_shakespeare_x_post)
def evaluate_x_post(self):
if self.state.retry_count > 3:
return "max_retry_exceeded"
result = XPostReviewCrew().crew().kickoff(inputs={"x_post": self.state.x_post})
self.state.valid = result["valid"]
self.state.feedback = result["feedback"]
print("valid", self.state.valid)
print("feedback", self.state.feedback)
self.state.retry_count += 1
if self.state.valid:
return "complete"
return "retry"
@listen("complete")
def save_result(self):
print("X post is valid")
print("X post:", self.state.x_post)
with open("x_post.txt", "w") as file:
file.write(self.state.x_post)
@listen("max_retry_exceeded")
def max_retry_exceeded_exit(self):
print("Max retry count exceeded")
print("X post:", self.state.x_post)
print("Feedback:", self.state.feedback)
def kickoff():
shakespeare_flow = ShakespeareXPostFlow()
shakespeare_flow.kickoff()
def plot():
shakespeare_flow = ShakespeareXPostFlow()
shakespeare_flow.plot()
if __name__ == "__main__":
kickoff()
```
#### Code Highlights
- **Retry Mechanism**: The flow uses a retry mechanism to regenerate the post if it doesn't meet the criteria, up to a maximum of three retries.
- **Feedback Loop**: Feedback from the `XPostReviewCrew` is used to refine the post iteratively.
- **State Management**: The flow maintains state using a Pydantic model, ensuring type safety and clarity.
For a complete example and further details, please refer to the [Self Evaluation Loop Flow repository](https://github.com/crewAIInc/crewAI-examples/tree/main/self_evaluation_loop_flow).
Plotting your flows is a powerful feature of CrewAI that enhances your ability to design and manage complex AI workflows. Whether you choose to use the `plot()` method or the command line, generating plots will provide you with a visual representation of your workflows, aiding in both development and presentation.
## Next Steps
If you're interested in exploring additional examples of flows, we have a variety of recommendations in our examples repository. Here are five specific flow examples, each showcasing unique use cases to help you match your current problem type to a specific example:
If you're interested in exploring additional examples of flows, we have a variety of recommendations in our examples repository. Here are four specific flow examples, each showcasing unique use cases to help you match your current problem type to a specific example:
1. **Email Auto Responder Flow**: This example demonstrates an infinite loop where a background job continually runs to automate email responses. It's a great use case for tasks that need to be performed repeatedly without manual intervention. [View Example](https://github.com/crewAIInc/crewAI-examples/tree/main/email_auto_responder_flow)
@@ -747,8 +615,6 @@ If you're interested in exploring additional examples of flows, we have a variet
4. **Meeting Assistant Flow**: This flow demonstrates how to broadcast one event to trigger multiple follow-up actions. For instance, after a meeting is completed, the flow can update a Trello board, send a Slack message, and save the results. It's a great example of handling multiple outcomes from a single event, making it ideal for comprehensive task management and notification systems. [View Example](https://github.com/crewAIInc/crewAI-examples/tree/main/meeting_assistant_flow)
5. **Self Evaluation Loop Flow**: This flow demonstrates a self-evaluation loop where AI workflows automatically assess and refine their outputs through feedback. It involves generating content, evaluating it, and iterating until the desired quality is achieved. This pattern is crucial for developing robust AI systems that can adapt and improve over time. [View Example](https://github.com/crewAIInc/crewAI-examples/tree/main/self_evaluation_loop_flow)
By exploring these examples, you can gain insights into how to leverage CrewAI Flows for various use cases, from automating repetitive tasks to managing complex, multi-step processes with dynamic decision-making and human feedback.
Also, check out our YouTube video on how to use flows in CrewAI below!
@@ -762,4 +628,4 @@ Also, check out our YouTube video on how to use flows in CrewAI below!
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
></iframe>
></iframe>

View File

@@ -1,6 +1,6 @@
---
title: Knowledge
description: Understand what knowledge is in CrewAI and how to effectively use it.
description: What is knowledge in CrewAI and how to use it.
icon: book
---
@@ -8,7 +8,8 @@ icon: book
## What is Knowledge?
Knowledge in CrewAI is a powerful system that allows AI agents to access and utilize external information sources during their tasks. Think of it as giving your agents a reference library they can consult while working.
Knowledge in CrewAI is a powerful system that allows AI agents to access and utilize external information sources during their tasks.
Think of it as giving your agents a reference library they can consult while working.
<Info>
Key benefits of using Knowledge:
@@ -37,130 +38,275 @@ CrewAI supports various types of knowledge sources out of the box:
## Quick Start
Here's a simple example using string-based knowledge:
Here's an example using string-based knowledge:
```python
from crewai import Agent, Task, Crew
from crewai.knowledge import StringKnowledgeSource
```python Code
from crewai import Agent, Task, Crew, Process, LLM
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
# 1. Create a knowledge source
product_info = StringKnowledgeSource(
content="""Our product X1000 has the following features:
- 10-hour battery life
- Water-resistant
- Available in black and silver
Price: $299.99""",
metadata={"category": "product"}
# Create a knowledge source
content = "Users name is John. He is 30 years old and lives in San Francisco."
string_source = StringKnowledgeSource(
content=content,
metadata={"preference": "personal"}
)
# 2. Create an agent with knowledge
sales_agent = Agent(
role="Sales Representative",
goal="Accurately answer customer questions about products",
backstory="Expert in product features and customer service",
knowledge_sources=[product_info] # Attach knowledge to agent
# Create an LLM with a temperature of 0 to ensure deterministic outputs
llm = LLM(model="gpt-4o-mini", temperature=0)
# Create an agent with the knowledge store
agent = Agent(
role="About User",
goal="You know everything about the user.",
backstory="""You are a master at understanding people and their preferences.""",
verbose=True,
allow_delegation=False,
llm=llm,
)
task = Task(
description="Answer the following questions about the user: {question}",
expected_output="An answer to the question.",
agent=agent,
)
# 3. Create a task
answer_task = Task(
description="Answer: What colors is the X1000 available in and how much does it cost?",
agent=sales_agent
)
# 4. Create and run the crew
crew = Crew(
agents=[sales_agent],
tasks=[answer_task]
agents=[agent],
tasks=[task],
verbose=True,
process=Process.sequential,
knowledge={
"sources": [string_source],
"metadata": {"preference": "personal"}
}, # Enable knowledge by adding the sources here. You can also add more sources to the sources list.
)
result = crew.kickoff()
result = crew.kickoff(inputs={"question": "What city does John live in and how old is he?"})
```
## Knowledge Configuration
### Collection Names
Knowledge sources are organized into collections for better management:
```python
# Create knowledge sources with specific collections
tech_specs = StringKnowledgeSource(
content="Technical specifications...",
collection_name="product_tech_specs"
)
pricing_info = StringKnowledgeSource(
content="Pricing information...",
collection_name="product_pricing"
)
```
### Metadata and Filtering
Add metadata to organize and filter knowledge:
Knowledge sources support metadata for better organization and filtering. Metadata is used to filter the knowledge sources when querying the knowledge store.
```python
```python Code
knowledge_source = StringKnowledgeSource(
content="Product details...",
metadata={
"category": "electronics",
"product_line": "premium",
"last_updated": "2024-03"
}
content="Users name is John. He is 30 years old and lives in San Francisco.",
metadata={"preference": "personal"} # Metadata is used to filter the knowledge sources
)
```
### Chunking Configuration
Control how your content is split for processing:
Control how content is split for processing by setting the chunk size and overlap.
```python
knowledge_source = PDFKnowledgeSource(
file_path="product_manual.pdf",
chunk_size=2000, # Characters per chunk
chunk_overlap=200 # Overlap between chunks
```python Code
knowledge_source = StringKnowledgeSource(
content="Long content...",
chunk_size=4000, # Characters per chunk (default)
chunk_overlap=200 # Overlap between chunks (default)
)
```
## Advanced Usage
## Embedder Configuration
### Custom Knowledge Sources
You can also configure the embedder for the knowledge store. This is useful if you want to use a different embedder for the knowledge store than the one used for the agents.
Create your own knowledge source by extending the base class:
```python
from crewai.knowledge.source import BaseKnowledgeSource
class APIKnowledgeSource(BaseKnowledgeSource):
def __init__(self, api_endpoint: str, **kwargs):
super().__init__(**kwargs)
self.api_endpoint = api_endpoint
def load_content(self):
# Implement API data fetching
response = requests.get(self.api_endpoint)
return response.json()
def add(self):
content = self.load_content()
# Process and store content
self.save_documents({"source": "api"})
```python Code
...
string_source = StringKnowledgeSource(
content="Users name is John. He is 30 years old and lives in San Francisco.",
metadata={"preference": "personal"}
)
crew = Crew(
...
knowledge={
"sources": [string_source],
"metadata": {"preference": "personal"},
"embedder_config": {
"provider": "openai", # Default embedder provider; can be "ollama", "gemini", e.t.c.
"config": {"model": "text-embedding-3-small"} # Default embedder model; can be "mxbai-embed-large", "nomic-embed-tex", e.t.c.
},
},
)
```
### Embedder Configuration
## Custom Knowledge Sources
Customize the embedding process:
CrewAI allows you to create custom knowledge sources for any type of data by extending the `BaseKnowledgeSource` class. Let's create a practical example that fetches and processes space news articles.
```python
#### Space News Knowledge Source Example
<CodeGroup>
```python Code
from crewai import Agent, Task, Crew, Process, LLM
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
import requests
from datetime import datetime
from typing import Dict, Any
from pydantic import BaseModel, Field
class SpaceNewsKnowledgeSource(BaseKnowledgeSource):
"""Knowledge source that fetches data from Space News API."""
api_endpoint: str = Field(description="API endpoint URL")
limit: int = Field(default=10, description="Number of articles to fetch")
def load_content(self) -> Dict[Any, str]:
"""Fetch and format space news articles."""
try:
response = requests.get(
f"{self.api_endpoint}?limit={self.limit}"
)
response.raise_for_status()
data = response.json()
articles = data.get('results', [])
formatted_data = self._format_articles(articles)
return {self.api_endpoint: formatted_data}
except Exception as e:
raise ValueError(f"Failed to fetch space news: {str(e)}")
def _format_articles(self, articles: list) -> str:
"""Format articles into readable text."""
formatted = "Space News Articles:\n\n"
for article in articles:
formatted += f"""
Title: {article['title']}
Published: {article['published_at']}
Summary: {article['summary']}
News Site: {article['news_site']}
URL: {article['url']}
-------------------"""
return formatted
def add(self) -> None:
"""Process and store the articles."""
content = self.load_content()
for _, text in content.items():
chunks = self._chunk_text(text)
self.chunks.extend(chunks)
self.save_documents(metadata={
"source": "space_news_api",
"timestamp": datetime.now().isoformat(),
"article_count": self.limit
})
# Create knowledge source
recent_news = SpaceNewsKnowledgeSource(
api_endpoint="https://api.spaceflightnewsapi.net/v4/articles",
limit=10,
metadata={"category": "recent_news", "source": "spaceflight_news"}
)
# Create specialized agent
space_analyst = Agent(
role="Space News Analyst",
goal="Answer questions about space news accurately and comprehensively",
backstory="""You are a space industry analyst with expertise in space exploration,
satellite technology, and space industry trends. You excel at answering questions
about space news and providing detailed, accurate information.""",
knowledge_sources=[recent_news],
llm=LLM(model="gpt-4", temperature=0.0)
)
# Create task that handles user questions
analysis_task = Task(
description="Answer this question about space news: {user_question}",
expected_output="A detailed answer based on the recent space news articles",
agent=space_analyst
)
# Create and run the crew
crew = Crew(
agents=[agent],
tasks=[task],
knowledge_sources=[source],
embedder_config={
"model": "BAAI/bge-small-en-v1.5",
"normalize": True,
"max_length": 512
}
agents=[space_analyst],
tasks=[analysis_task],
verbose=True,
process=Process.sequential
)
# Example usage
result = crew.kickoff(
inputs={"user_question": "What are the latest developments in space exploration?"}
)
```
```output Output
# Agent: Space News Analyst
## Task: Answer this question about space news: What are the latest developments in space exploration?
# Agent: Space News Analyst
## Final Answer:
The latest developments in space exploration, based on recent space news articles, include the following:
1. SpaceX has received the final regulatory approvals to proceed with the second integrated Starship/Super Heavy launch, scheduled for as soon as the morning of Nov. 17, 2023. This is a significant step in SpaceX's ambitious plans for space exploration and colonization. [Source: SpaceNews](https://spacenews.com/starship-cleared-for-nov-17-launch/)
2. SpaceX has also informed the US Federal Communications Commission (FCC) that it plans to begin launching its first next-generation Starlink Gen2 satellites. This represents a major upgrade to the Starlink satellite internet service, which aims to provide high-speed internet access worldwide. [Source: Teslarati](https://www.teslarati.com/spacex-first-starlink-gen2-satellite-launch-2022/)
3. AI startup Synthetaic has raised $15 million in Series B funding. The company uses artificial intelligence to analyze data from space and air sensors, which could have significant applications in space exploration and satellite technology. [Source: SpaceNews](https://spacenews.com/ai-startup-synthetaic-raises-15-million-in-series-b-funding/)
4. The Space Force has formally established a unit within the U.S. Indo-Pacific Command, marking a permanent presence in the Indo-Pacific region. This could have significant implications for space security and geopolitics. [Source: SpaceNews](https://spacenews.com/space-force-establishes-permanent-presence-in-indo-pacific-region/)
5. Slingshot Aerospace, a space tracking and data analytics company, is expanding its network of ground-based optical telescopes to increase coverage of low Earth orbit. This could improve our ability to track and analyze objects in low Earth orbit, including satellites and space debris. [Source: SpaceNews](https://spacenews.com/slingshots-space-tracking-network-to-extend-coverage-of-low-earth-orbit/)
6. The National Natural Science Foundation of China has outlined a five-year project for researchers to study the assembly of ultra-large spacecraft. This could lead to significant advancements in spacecraft technology and space exploration capabilities. [Source: SpaceNews](https://spacenews.com/china-researching-challenges-of-kilometer-scale-ultra-large-spacecraft/)
7. The Center for AEroSpace Autonomy Research (CAESAR) at Stanford University is focusing on spacecraft autonomy. The center held a kickoff event on May 22, 2024, to highlight the industry, academia, and government collaboration it seeks to foster. This could lead to significant advancements in autonomous spacecraft technology. [Source: SpaceNews](https://spacenews.com/stanford-center-focuses-on-spacecraft-autonomy/)
```
</CodeGroup>
#### Key Components Explained
1. **Custom Knowledge Source (`SpaceNewsKnowledgeSource`)**:
- Extends `BaseKnowledgeSource` for integration with CrewAI
- Configurable API endpoint and article limit
- Implements three key methods:
- `load_content()`: Fetches articles from the API
- `_format_articles()`: Structures the articles into readable text
- `add()`: Processes and stores the content with metadata
2. **Agent Configuration**:
- Specialized role as a Space News Analyst
- Uses the knowledge source to access space news
3. **Task Setup**:
- Takes a user question as input through `{user_question}`
- Designed to provide detailed answers based on the knowledge source
4. **Crew Orchestration**:
- Manages the workflow between agent and task
- Handles input/output through the kickoff method
This example demonstrates how to:
- Create a custom knowledge source that fetches real-time data
- Process and format external data for AI consumption
- Use the knowledge source to answer specific user questions
- Integrate everything seamlessly with CrewAI's agent system
#### About the Spaceflight News API
The example uses the [Spaceflight News API](https://api.spaceflightnewsapi.net/v4/documentation), which:
- Provides free access to space-related news articles
- Requires no authentication
- Returns structured data about space news
- Supports pagination and filtering
You can customize the API query by modifying the endpoint URL:
```python
# Fetch more articles
recent_news = SpaceNewsKnowledgeSource(
api_endpoint="https://api.spaceflightnewsapi.net/v4/articles",
limit=20, # Increase the number of articles
metadata={"category": "recent_news"}
)
# Add search parameters
recent_news = SpaceNewsKnowledgeSource(
api_endpoint="https://api.spaceflightnewsapi.net/v4/articles?search=NASA", # Search for NASA news
limit=10,
metadata={"category": "nasa_news"}
)
```
@@ -168,43 +314,16 @@ crew = Crew(
<AccordionGroup>
<Accordion title="Content Organization">
- Use meaningful collection names
- Add detailed metadata for filtering
- Keep chunk sizes appropriate for your content
- Use descriptive metadata for better filtering
- Keep chunk sizes appropriate for your content type
- Consider content overlap for context preservation
- Organize related information into separate knowledge sources
</Accordion>
<Accordion title="Performance Tips">
- Use smaller chunk sizes for precise retrieval
- Implement metadata filtering for faster searches
- Choose appropriate embedding models for your use case
- Cache frequently accessed knowledge
- Use metadata filtering to narrow search scope
- Adjust chunk sizes based on content complexity
- Configure appropriate embedding models
- Consider using local embedding providers for faster processing
</Accordion>
<Accordion title="Error Handling">
- Validate knowledge source content
- Handle missing or corrupted files
- Monitor embedding generation
- Implement fallback options
</Accordion>
</AccordionGroup>
## Common Issues and Solutions
<AccordionGroup>
<Accordion title="Content Not Found">
If agents can't find relevant information:
- Check chunk sizes
- Verify knowledge source loading
- Review metadata filters
- Test with simpler queries first
</Accordion>
<Accordion title="Performance Issues">
If knowledge retrieval is slow:
- Reduce chunk sizes
- Optimize metadata filtering
- Consider using a lighter embedding model
- Cache frequently accessed content
</Accordion>
</AccordionGroup>
</AccordionGroup>

View File

@@ -7,32 +7,45 @@ icon: link
## Using LangChain Tools
<Info>
CrewAI seamlessly integrates with LangChains comprehensive [list of tools](https://python.langchain.com/docs/integrations/tools/), all of which can be used with CrewAI.
CrewAI seamlessly integrates with LangChain's comprehensive [list of tools](https://python.langchain.com/docs/integrations/tools/), all of which can be used with CrewAI.
</Info>
```python Code
import os
from crewai import Agent
from langchain.agents import Tool
from langchain.utilities import GoogleSerperAPIWrapper
from dotenv import load_dotenv
from crewai import Agent, Task, Crew
from crewai.tools import BaseTool
from pydantic import Field
from langchain_community.utilities import GoogleSerperAPIWrapper
# Setup API keys
os.environ["SERPER_API_KEY"] = "Your Key"
# Set up your SERPER_API_KEY key in an .env file, eg:
# SERPER_API_KEY=<your api key>
load_dotenv()
search = GoogleSerperAPIWrapper()
# Create and assign the search tool to an agent
serper_tool = Tool(
name="Intermediate Answer",
func=search.run,
description="Useful for search-based queries",
)
class SearchTool(BaseTool):
name: str = "Search"
description: str = "Useful for search-based queries. Use this to find current information about markets, companies, and trends."
search: GoogleSerperAPIWrapper = Field(default_factory=GoogleSerperAPIWrapper)
agent = Agent(
role='Research Analyst',
goal='Provide up-to-date market analysis',
backstory='An expert analyst with a keen eye for market trends.',
tools=[serper_tool]
def _run(self, query: str) -> str:
"""Execute the search query and return results"""
try:
return self.search.run(query)
except Exception as e:
return f"Error performing search: {str(e)}"
# Create Agents
researcher = Agent(
role='Research Analyst',
goal='Gather current market data and trends',
backstory="""You are an expert research analyst with years of experience in
gathering market intelligence. You're known for your ability to find
relevant and up-to-date market information and present it in a clear,
actionable format.""",
tools=[SearchTool()],
verbose=True
)
# rest of the code ...
@@ -40,6 +53,6 @@ agent = Agent(
## Conclusion
Tools are pivotal in extending the capabilities of CrewAI agents, enabling them to undertake a broad spectrum of tasks and collaborate effectively.
When building solutions with CrewAI, leverage both custom and existing tools to empower your agents and enhance the AI ecosystem. Consider utilizing error handling, caching mechanisms,
and the flexibility of tool arguments to optimize your agents' performance and capabilities.
Tools are pivotal in extending the capabilities of CrewAI agents, enabling them to undertake a broad spectrum of tasks and collaborate effectively.
When building solutions with CrewAI, leverage both custom and existing tools to empower your agents and enhance the AI ecosystem. Consider utilizing error handling, caching mechanisms,
and the flexibility of tool arguments to optimize your agents' performance and capabilities.

View File

@@ -263,6 +263,167 @@ analysis_task = Task(
)
```
## Getting Structured Consistent Outputs from Tasks
When you need to ensure that a task outputs a structured and consistent format, you can use the `output_pydantic` or `output_json` properties on a task. These properties allow you to define the expected output structure, making it easier to parse and utilize the results in your application.
<Note>
It's also important to note that the output of the final task of a crew becomes the final output of the actual crew itself.
</Note>
### Using `output_pydantic`
The `output_pydantic` property allows you to define a Pydantic model that the task output should conform to. This ensures that the output is not only structured but also validated according to the Pydantic model.
Heres an example demonstrating how to use output_pydantic:
```python Code
import json
from crewai import Agent, Crew, Process, Task
from pydantic import BaseModel
class Blog(BaseModel):
title: str
content: str
blog_agent = Agent(
role="Blog Content Generator Agent",
goal="Generate a blog title and content",
backstory="""You are an expert content creator, skilled in crafting engaging and informative blog posts.""",
verbose=False,
allow_delegation=False,
llm="gpt-4o",
)
task1 = Task(
description="""Create a blog title and content on a given topic. Make sure the content is under 200 words.""",
expected_output="A compelling blog title and well-written content.",
agent=blog_agent,
output_pydantic=Blog,
)
# Instantiate your crew with a sequential process
crew = Crew(
agents=[blog_agent],
tasks=[task1],
verbose=True,
process=Process.sequential,
)
result = crew.kickoff()
# Option 1: Accessing Properties Using Dictionary-Style Indexing
print("Accessing Properties - Option 1")
title = result["title"]
content = result["content"]
print("Title:", title)
print("Content:", content)
# Option 2: Accessing Properties Directly from the Pydantic Model
print("Accessing Properties - Option 2")
title = result.pydantic.title
content = result.pydantic.content
print("Title:", title)
print("Content:", content)
# Option 3: Accessing Properties Using the to_dict() Method
print("Accessing Properties - Option 3")
output_dict = result.to_dict()
title = output_dict["title"]
content = output_dict["content"]
print("Title:", title)
print("Content:", content)
# Option 4: Printing the Entire Blog Object
print("Accessing Properties - Option 5")
print("Blog:", result)
```
In this example:
* A Pydantic model Blog is defined with title and content fields.
* The task task1 uses the output_pydantic property to specify that its output should conform to the Blog model.
* After executing the crew, you can access the structured output in multiple ways as shown.
#### Explanation of Accessing the Output
1. Dictionary-Style Indexing: You can directly access the fields using result["field_name"]. This works because the CrewOutput class implements the __getitem__ method.
2. Directly from Pydantic Model: Access the attributes directly from the result.pydantic object.
3. Using to_dict() Method: Convert the output to a dictionary and access the fields.
4. Printing the Entire Object: Simply print the result object to see the structured output.
### Using `output_json`
The `output_json` property allows you to define the expected output in JSON format. This ensures that the task's output is a valid JSON structure that can be easily parsed and used in your application.
Heres an example demonstrating how to use `output_json`:
```python Code
import json
from crewai import Agent, Crew, Process, Task
from pydantic import BaseModel
# Define the Pydantic model for the blog
class Blog(BaseModel):
title: str
content: str
# Define the agent
blog_agent = Agent(
role="Blog Content Generator Agent",
goal="Generate a blog title and content",
backstory="""You are an expert content creator, skilled in crafting engaging and informative blog posts.""",
verbose=False,
allow_delegation=False,
llm="gpt-4o",
)
# Define the task with output_json set to the Blog model
task1 = Task(
description="""Create a blog title and content on a given topic. Make sure the content is under 200 words.""",
expected_output="A JSON object with 'title' and 'content' fields.",
agent=blog_agent,
output_json=Blog,
)
# Instantiate the crew with a sequential process
crew = Crew(
agents=[blog_agent],
tasks=[task1],
verbose=True,
process=Process.sequential,
)
# Kickoff the crew to execute the task
result = crew.kickoff()
# Option 1: Accessing Properties Using Dictionary-Style Indexing
print("Accessing Properties - Option 1")
title = result["title"]
content = result["content"]
print("Title:", title)
print("Content:", content)
# Option 2: Printing the Entire Blog Object
print("Accessing Properties - Option 2")
print("Blog:", result)
```
In this example:
* A Pydantic model Blog is defined with title and content fields, which is used to specify the structure of the JSON output.
* The task task1 uses the output_json property to indicate that it expects a JSON output conforming to the Blog model.
* After executing the crew, you can access the structured JSON output in two ways as shown.
#### Explanation of Accessing the Output
1. Accessing Properties Using Dictionary-Style Indexing: You can access the fields directly using result["field_name"]. This is possible because the CrewOutput class implements the __getitem__ method, allowing you to treat the output like a dictionary. In this option, we're retrieving the title and content from the result.
2. Printing the Entire Blog Object: By printing result, you get the string representation of the CrewOutput object. Since the __str__ method is implemented to return the JSON output, this will display the entire output as a formatted string representing the Blog object.
---
By using output_pydantic or output_json, you ensure that your tasks produce outputs in a consistent and structured format, making it easier to process and utilize the data within your application or across multiple tasks.
## Integrating Tools with Tasks
Leverage tools from the [CrewAI Toolkit](https://github.com/joaomdmoura/crewai-tools) and [LangChain Tools](https://python.langchain.com/docs/integrations/tools) for enhanced task performance and agent interaction.
@@ -471,4 +632,4 @@ save_output_task = Task(
Tasks are the driving force behind the actions of agents in CrewAI.
By properly defining tasks and their outcomes, you set the stage for your AI agents to work effectively, either independently or as a collaborative unit.
Equipping tasks with appropriate tools, understanding the execution process, and following robust validation practices are crucial for maximizing CrewAI's potential,
ensuring agents are effectively prepared for their assignments and that tasks are executed as intended.
ensuring agents are effectively prepared for their assignments and that tasks are executed as intended.

View File

@@ -32,6 +32,7 @@ LiteLLM supports a wide range of providers, including but not limited to:
- Cloudflare Workers AI
- DeepInfra
- Groq
- [NVIDIA NIMs](https://docs.api.nvidia.com/nim/reference/models-1)
- And many more!
For a complete and up-to-date list of supported providers, please refer to the [LiteLLM Providers documentation](https://docs.litellm.ai/docs/providers).
@@ -125,10 +126,10 @@ You can connect to OpenAI-compatible LLMs using either environment variables or
</Tab>
<Tab title="Using LLM Class Attributes">
<CodeGroup>
```python Code
llm = LLM(
model="custom-model-name",
api_key="your-api-key",
```python Code
llm = LLM(
model="custom-model-name",
api_key="your-api-key",
base_url="https://api.your-provider.com/v1"
)
agent = Agent(llm=llm, ...)
@@ -179,4 +180,4 @@ This is particularly useful when working with OpenAI-compatible APIs or when you
## Conclusion
By leveraging LiteLLM, CrewAI offers seamless integration with a vast array of LLMs. This flexibility allows you to choose the most suitable model for your specific needs, whether you prioritize performance, cost-efficiency, or local deployment. Remember to consult the [LiteLLM documentation](https://docs.litellm.ai/docs/) for the most up-to-date information on supported models and configuration options.
By leveraging LiteLLM, CrewAI offers seamless integration with a vast array of LLMs. This flexibility allows you to choose the most suitable model for your specific needs, whether you prioritize performance, cost-efficiency, or local deployment. Remember to consult the [LiteLLM documentation](https://docs.litellm.ai/docs/) for the most up-to-date information on supported models and configuration options.

View File

@@ -0,0 +1,181 @@
---
title: Agent Monitoring with OpenLIT
description: Quickly start monitoring your Agents in just a single line of code with OpenTelemetry.
icon: magnifying-glass-chart
---
# OpenLIT Overview
[OpenLIT](https://github.com/openlit/openlit?src=crewai-docs) is an open-source tool that makes it simple to monitor the performance of AI agents, LLMs, VectorDBs, and GPUs with just **one** line of code.
It provides OpenTelemetry-native tracing and metrics to track important parameters like cost, latency, interactions and task sequences.
This setup enables you to track hyperparameters and monitor for performance issues, helping you find ways to enhance and fine-tune your agents over time.
<Frame caption="OpenLIT Dashboard">
<img src="/images/openlit1.png" alt="Overview Agent usage including cost and tokens" />
<img src="/images/openlit2.png" alt="Overview of agent otel traces and metrics" />
<img src="/images/openlit3.png" alt="Overview of agent traces in details" />
</Frame>
### Features
- **Analytics Dashboard**: Monitor your Agents health and performance with detailed dashboards that track metrics, costs, and user interactions.
- **OpenTelemetry-native Observability SDK**: Vendor-neutral SDKs to send traces and metrics to your existing observability tools like Grafana, DataDog and more.
- **Cost Tracking for Custom and Fine-Tuned Models**: Tailor cost estimations for specific models using custom pricing files for precise budgeting.
- **Exceptions Monitoring Dashboard**: Quickly spot and resolve issues by tracking common exceptions and errors with a monitoring dashboard.
- **Compliance and Security**: Detect potential threats such as profanity and PII leaks.
- **Prompt Injection Detection**: Identify potential code injection and secret leaks.
- **API Keys and Secrets Management**: Securely handle your LLM API keys and secrets centrally, avoiding insecure practices.
- **Prompt Management**: Manage and version Agent prompts using PromptHub for consistent and easy access across Agents.
- **Model Playground** Test and compare different models for your CrewAI agents before deployment.
## Setup Instructions
<Steps>
<Step title="Deploy OpenLIT">
<Steps>
<Step title="Git Clone OpenLIT Repository">
```shell
git clone git@github.com:openlit/openlit.git
```
</Step>
<Step title="Start Docker Compose">
From the root directory of the [OpenLIT Repo](https://github.com/openlit/openlit), Run the below command:
```shell
docker compose up -d
```
</Step>
</Steps>
</Step>
<Step title="Install OpenLIT SDK">
```shell
pip install openlit
```
</Step>
<Step title="Initialize OpenLIT in Your Application">
Add the following two lines to your application code:
<Tabs>
<Tab title="Setup using function arguments">
```python
import openlit
openlit.init(otlp_endpoint="http://127.0.0.1:4318")
```
Example Usage for monitoring a CrewAI Agent:
```python
from crewai import Agent, Task, Crew, Process
import openlit
openlit.init(disable_metrics=True)
# Define your agents
researcher = Agent(
role="Researcher",
goal="Conduct thorough research and analysis on AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI, and startups. You work as a freelancer and are currently researching for a new client.",
allow_delegation=False,
llm='command-r'
)
# Define your task
task = Task(
description="Generate a list of 5 interesting ideas for an article, then write one captivating paragraph for each idea that showcases the potential of a full article on this topic. Return the list of ideas with their paragraphs and your notes.",
expected_output="5 bullet points, each with a paragraph and accompanying notes.",
)
# Define the manager agent
manager = Agent(
role="Project Manager",
goal="Efficiently manage the crew and ensure high-quality task completion",
backstory="You're an experienced project manager, skilled in overseeing complex projects and guiding teams to success. Your role is to coordinate the efforts of the crew members, ensuring that each task is completed on time and to the highest standard.",
allow_delegation=True,
llm='command-r'
)
# Instantiate your crew with a custom manager
crew = Crew(
agents=[researcher],
tasks=[task],
manager_agent=manager,
process=Process.hierarchical,
)
# Start the crew's work
result = crew.kickoff()
print(result)
```
</Tab>
<Tab title="Setup using Environment Variables">
Add the following two lines to your application code:
```python
import openlit
openlit.init()
```
Run the following command to configure the OTEL export endpoint:
```shell
export OTEL_EXPORTER_OTLP_ENDPOINT = "http://127.0.0.1:4318"
```
Example Usage for monitoring a CrewAI Async Agent:
```python
import asyncio
from crewai import Crew, Agent, Task
import openlit
openlit.init(otlp_endpoint="http://127.0.0.1:4318")
# Create an agent with code execution enabled
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True,
llm="command-r"
)
# Create a task that requires code execution
data_analysis_task = Task(
description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent,
expected_output="5 bullet points, each with a paragraph and accompanying notes.",
)
# Create a crew and add the task
analysis_crew = Crew(
agents=[coding_agent],
tasks=[data_analysis_task]
)
# Async function to kickoff the crew asynchronously
async def async_crew_execution():
result = await analysis_crew.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
print("Crew Result:", result)
# Run the async function
asyncio.run(async_crew_execution())
```
</Tab>
</Tabs>
Refer to OpenLIT [Python SDK repository](https://github.com/openlit/openlit/tree/main/sdk/python) for more advanced configurations and use cases.
</Step>
<Step title="Visualize and Analyze">
With the Agent Observability data now being collected and sent to OpenLIT, the next step is to visualize and analyze this data to get insights into your Agent's performance, behavior, and identify areas of improvement.
Just head over to OpenLIT at `127.0.0.1:3000` on your browser to start exploring. You can login using the default credentials
- **Email**: `user@openlit.io`
- **Password**: `openlituser`
<Frame caption="OpenLIT Dashboard">
<img src="/images/openlit1.png" alt="Overview Agent usage including cost and tokens" />
<img src="/images/openlit2.png" alt="Overview of agent otel traces and metrics" />
</Frame>
</Step>
</Steps>

BIN
docs/images/openlit1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 390 KiB

BIN
docs/images/openlit2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 422 KiB

BIN
docs/images/openlit3.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 799 KiB

View File

@@ -99,7 +99,8 @@
"how-to/replay-tasks-from-latest-crew-kickoff",
"how-to/conditional-tasks",
"how-to/agentops-observability",
"how-to/langtrace-observability"
"how-to/langtrace-observability",
"how-to/openlit-observability"
]
},
{

View File

@@ -349,7 +349,7 @@ Replace `<task_id>` with the ID of the task you want to replay.
If you need to reset the memory of your crew before running it again, you can do so by calling the reset memory feature:
```shell
crewai reset-memory
crewai reset-memories --all
```
This will clear the crew's memory, allowing for a fresh start.

View File

@@ -129,7 +129,6 @@ nav:
- Processes: 'core-concepts/Processes.md'
- Crews: 'core-concepts/Crews.md'
- Collaboration: 'core-concepts/Collaboration.md'
- Pipeline: 'core-concepts/Pipeline.md'
- Training: 'core-concepts/Training-Crew.md'
- Memory: 'core-concepts/Memory.md'
- Planning: 'core-concepts/Planning.md'
@@ -152,6 +151,7 @@ nav:
- Conditional Tasks: 'how-to/Conditional-Tasks.md'
- Agent Monitoring with AgentOps: 'how-to/AgentOps-Observability.md'
- Agent Monitoring with LangTrace: 'how-to/Langtrace-Observability.md'
- Agent Monitoring with OpenLIT: 'how-to/openlit-Observability.md'
- Tools Docs:
- Browserbase Web Loader: 'tools/BrowserbaseLoadTool.md'
- Code Docs RAG Search: 'tools/CodeDocsSearchTool.md'

7507
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[project]
name = "crewai"
version = "0.83.0"
version = "0.85.0"
description = "Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks."
readme = "README.md"
requires-python = ">=3.10,<=3.13"

View File

@@ -5,9 +5,7 @@ from crewai.crew import Crew
from crewai.flow.flow import Flow
from crewai.knowledge.knowledge import Knowledge
from crewai.llm import LLM
from crewai.pipeline import Pipeline
from crewai.process import Process
from crewai.routers import Router
from crewai.task import Task
warnings.filterwarnings(
@@ -16,14 +14,12 @@ warnings.filterwarnings(
category=UserWarning,
module="pydantic.main",
)
__version__ = "0.83.0"
__version__ = "0.85.0"
__all__ = [
"Agent",
"Crew",
"Process",
"Task",
"Pipeline",
"Router",
"LLM",
"Flow",
"Knowledge",

View File

@@ -3,16 +3,15 @@ from typing import TYPE_CHECKING, Optional
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.utilities import I18N
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities import I18N
from crewai.utilities.printer import Printer
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.crew import Crew
from crewai.task import Task
from crewai.agents.agent_builder.base_agent import BaseAgent
class CrewAgentExecutorMixin:
@@ -100,14 +99,19 @@ class CrewAgentExecutorMixin:
print(f"Failed to add to long term memory: {e}")
pass
def _ask_human_input(self, final_answer: dict) -> str:
def _ask_human_input(self, final_answer: str) -> str:
"""Prompt human input for final decision making."""
self._printer.print(
content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m"
)
self._printer.print(
content="\n\n=====\n## Please provide feedback on the Final Result and the Agent's actions:",
content=(
"\n\n=====\n"
"## Please provide feedback on the Final Result and the Agent's actions. "
"Respond with 'looks good' or a similar phrase when you're satisfied.\n"
"=====\n"
),
color="bold_yellow",
)
return input()

View File

@@ -16,7 +16,7 @@ from crewai.agents.tools_handler import ToolsHandler
from crewai.tools.base_tool import BaseTool
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N, Printer
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
@@ -90,7 +90,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
if "system" in self.prompt:
system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs)
user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs)
self.messages.append(self._format_msg(system_prompt, role="system"))
self.messages.append(self._format_msg(user_prompt))
else:
@@ -103,17 +102,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer = self._invoke_loop()
if self.ask_for_human_input:
human_feedback = self._ask_human_input(formatted_answer.output)
if self.crew and self.crew._train:
self._handle_crew_training_output(formatted_answer, human_feedback)
formatted_answer = self._handle_human_feedback(formatted_answer)
# Making sure we only ask for it once, so disabling for the next thought loop
self.ask_for_human_input = False
self.messages.append(self._format_msg(f"Feedback: {human_feedback}"))
formatted_answer = self._invoke_loop()
if self.crew and self.crew._train:
self._handle_crew_training_output(formatted_answer)
self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
return {"output": formatted_answer.output}
@@ -326,16 +316,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _handle_context_length(self) -> None:
if self.respect_context_window:
self._logger.log(
"debug",
"Context length exceeded. Summarizing content to fit the model context window.",
self._printer.print(
content="Context length exceeded. Summarizing content to fit the model context window.",
color="yellow",
)
self._summarize_messages()
else:
self._logger.log(
"debug",
"Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
self._printer.print(
content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
color="red",
)
raise SystemExit(
@@ -362,15 +350,13 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
] = result.output
training_handler.save(training_data)
else:
self._logger.log(
"error",
"Invalid train iteration type or agent_id not in training data.",
self._printer.print(
content="Invalid train iteration type or agent_id not in training data.",
color="red",
)
else:
self._logger.log(
"error",
"Crew is None or does not have _train_iteration attribute.",
self._printer.print(
content="Crew is None or does not have _train_iteration attribute.",
color="red",
)
@@ -388,15 +374,13 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
train_iteration, agent_id, training_data
)
else:
self._logger.log(
"error",
"Invalid train iteration type. Expected int.",
self._printer.print(
content="Invalid train iteration type. Expected int.",
color="red",
)
else:
self._logger.log(
"error",
"Crew is None or does not have _train_iteration attribute.",
self._printer.print(
content="Crew is None or does not have _train_iteration attribute.",
color="red",
)
@@ -412,3 +396,82 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _format_msg(self, prompt: str, role: str = "user") -> Dict[str, str]:
prompt = prompt.rstrip()
return {"role": role, "content": prompt}
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""
Handles the human feedback loop, allowing the user to provide feedback
on the agent's output and determining if additional iterations are needed.
Parameters:
formatted_answer (AgentFinish): The initial output from the agent.
Returns:
AgentFinish: The final output after incorporating human feedback.
"""
while self.ask_for_human_input:
human_feedback = self._ask_human_input(formatted_answer.output)
print("Human feedback: ", human_feedback)
if self.crew and self.crew._train:
self._handle_crew_training_output(formatted_answer, human_feedback)
# Make an LLM call to verify if additional changes are requested based on human feedback
additional_changes_prompt = self._i18n.slice(
"human_feedback_classification"
).format(feedback=human_feedback)
retry_count = 0
llm_call_successful = False
additional_changes_response = None
while retry_count < MAX_LLM_RETRY and not llm_call_successful:
try:
additional_changes_response = (
self.llm.call(
[
self._format_msg(
additional_changes_prompt, role="system"
)
],
callbacks=self.callbacks,
)
.strip()
.lower()
)
llm_call_successful = True
except Exception as e:
retry_count += 1
self._printer.print(
content=f"Error during LLM call to classify human feedback: {e}. Retrying... ({retry_count}/{MAX_LLM_RETRY})",
color="red",
)
if not llm_call_successful:
self._printer.print(
content="Error processing feedback after multiple attempts.",
color="red",
)
self.ask_for_human_input = False
break
if additional_changes_response == "false":
self.ask_for_human_input = False
elif additional_changes_response == "true":
self.ask_for_human_input = True
# Add human feedback to messages
self.messages.append(self._format_msg(f"Feedback: {human_feedback}"))
# Invoke the loop again with updated messages
formatted_answer = self._invoke_loop()
if self.crew and self.crew._train:
self._handle_crew_training_output(formatted_answer)
else:
# Unexpected response
self._printer.print(
content=f"Unexpected response from LLM: '{additional_changes_response}'. Assuming no additional changes requested.",
color="red",
)
self.ask_for_human_input = False
return formatted_answer

View File

@@ -6,7 +6,6 @@ import pkg_resources
from crewai.cli.add_crew_to_flow import add_crew_to_flow
from crewai.cli.create_crew import create_crew
from crewai.cli.create_flow import create_flow
from crewai.cli.create_pipeline import create_pipeline
from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
)
@@ -31,22 +30,18 @@ def crewai():
@crewai.command()
@click.argument("type", type=click.Choice(["crew", "pipeline", "flow"]))
@click.argument("type", type=click.Choice(["crew", "flow"]))
@click.argument("name")
@click.option("--provider", type=str, help="The provider to use for the crew")
@click.option("--skip_provider", is_flag=True, help="Skip provider validation")
def create(type, name, provider, skip_provider=False):
"""Create a new crew, pipeline, or flow."""
"""Create a new crew, or flow."""
if type == "crew":
create_crew(name, provider, skip_provider)
elif type == "pipeline":
create_pipeline(name)
elif type == "flow":
create_flow(name)
else:
click.secho(
"Error: Invalid type. Must be 'crew', 'pipeline', or 'flow'.", fg="red"
)
click.secho("Error: Invalid type. Must be 'crew' or 'flow'.", fg="red")
@crewai.command()

View File

@@ -39,6 +39,7 @@ def create_folder_structure(name, parent_folder=None):
folder_path.mkdir(parents=True)
(folder_path / "tests").mkdir(exist_ok=True)
(folder_path / "knowledge").mkdir(exist_ok=True)
if not parent_folder:
(folder_path / "src" / folder_name).mkdir(parents=True)
(folder_path / "src" / folder_name / "tools").mkdir(parents=True)
@@ -52,7 +53,14 @@ def copy_template_files(folder_path, name, class_name, parent_folder):
templates_dir = package_dir / "templates" / "crew"
root_template_files = (
[".gitignore", "pyproject.toml", "README.md"] if not parent_folder else []
[
".gitignore",
"pyproject.toml",
"README.md",
"knowledge/user_preference.txt",
]
if not parent_folder
else []
)
tools_template_files = ["tools/custom_tool.py", "tools/__init__.py"]
config_template_files = ["config/agents.yaml", "config/tasks.yaml"]
@@ -168,7 +176,9 @@ def create_crew(name, provider=None, skip_provider=False, parent_folder=None):
templates_dir = package_dir / "templates" / "crew"
root_template_files = (
[".gitignore", "pyproject.toml", "README.md"] if not parent_folder else []
[".gitignore", "pyproject.toml", "README.md", "knowledge/user_preference.txt"]
if not parent_folder
else []
)
tools_template_files = ["tools/custom_tool.py", "tools/__init__.py"]
config_template_files = ["config/agents.yaml", "config/tasks.yaml"]

View File

@@ -1,107 +0,0 @@
import shutil
from pathlib import Path
import click
def create_pipeline(name, router=False):
"""Create a new pipeline project."""
folder_name = name.replace(" ", "_").replace("-", "_").lower()
class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "")
click.secho(f"Creating pipeline {folder_name}...", fg="green", bold=True)
project_root = Path(folder_name)
if project_root.exists():
click.secho(f"Error: Folder {folder_name} already exists.", fg="red")
return
# Create directory structure
(project_root / "src" / folder_name).mkdir(parents=True)
(project_root / "src" / folder_name / "pipelines").mkdir(parents=True)
(project_root / "src" / folder_name / "crews").mkdir(parents=True)
(project_root / "src" / folder_name / "tools").mkdir(parents=True)
(project_root / "tests").mkdir(exist_ok=True)
# Create .env file
with open(project_root / ".env", "w") as file:
file.write("OPENAI_API_KEY=YOUR_API_KEY")
package_dir = Path(__file__).parent
template_folder = "pipeline_router" if router else "pipeline"
templates_dir = package_dir / "templates" / template_folder
# List of template files to copy
root_template_files = [".gitignore", "pyproject.toml", "README.md"]
src_template_files = ["__init__.py", "main.py"]
tools_template_files = ["tools/__init__.py", "tools/custom_tool.py"]
if router:
crew_folders = [
"classifier_crew",
"normal_crew",
"urgent_crew",
]
pipelines_folders = [
"pipelines/__init__.py",
"pipelines/pipeline_classifier.py",
"pipelines/pipeline_normal.py",
"pipelines/pipeline_urgent.py",
]
else:
crew_folders = [
"research_crew",
"write_linkedin_crew",
"write_x_crew",
]
pipelines_folders = ["pipelines/__init__.py", "pipelines/pipeline.py"]
def process_file(src_file, dst_file):
with open(src_file, "r") as file:
content = file.read()
content = content.replace("{{name}}", name)
content = content.replace("{{crew_name}}", class_name)
content = content.replace("{{folder_name}}", folder_name)
content = content.replace("{{pipeline_name}}", class_name)
with open(dst_file, "w") as file:
file.write(content)
# Copy and process root template files
for file_name in root_template_files:
src_file = templates_dir / file_name
dst_file = project_root / file_name
process_file(src_file, dst_file)
# Copy and process src template files
for file_name in src_template_files:
src_file = templates_dir / file_name
dst_file = project_root / "src" / folder_name / file_name
process_file(src_file, dst_file)
# Copy tools files
for file_name in tools_template_files:
src_file = templates_dir / file_name
dst_file = project_root / "src" / folder_name / file_name
shutil.copy(src_file, dst_file)
# Copy pipelines folders
for file_name in pipelines_folders:
src_file = templates_dir / file_name
dst_file = project_root / "src" / folder_name / file_name
process_file(src_file, dst_file)
# Copy crew folders
for crew_folder in crew_folders:
src_crew_folder = templates_dir / "crews" / crew_folder
dst_crew_folder = project_root / "src" / folder_name / "crews" / crew_folder
if src_crew_folder.exists():
shutil.copytree(src_crew_folder, dst_crew_folder)
else:
click.secho(
f"Warning: Crew folder {crew_folder} not found in template.",
fg="yellow",
)
click.secho(f"Pipeline {name} created successfully!", fg="green", bold=True)

View File

@@ -1,8 +1,9 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task, before_kickoff, after_kickoff
# Uncomment the following line to use an example of a custom tool
# from {{folder_name}}.tools.custom_tool import MyCustomTool
# Uncomment the following line to use an example of a knowledge source
# from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
# Check our tools documentations for more information on how to use them
# from crewai_tools import SerperDevTool
@@ -57,10 +58,20 @@ class {{crew_name}}():
@crew
def crew(self) -> Crew:
"""Creates the {{crew_name}} crew"""
# You can add knowledge sources here
# knowledge_path = "user_preference.txt"
# sources = [
# TextFileKnowledgeSource(
# file_path="knowledge/user_preference.txt",
# metadata={"preference": "personal"}
# ),
# ]
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
# knowledge_sources=sources, # In the case you want to add knowledge sources
)

View File

@@ -0,0 +1,4 @@
User name is John Doe.
User is an AI Engineer.
User is interested in AI Agents.
User is based in San Francisco, California.

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<=3.13"
dependencies = [
"crewai[tools]>=0.83.0,<1.0.0"
"crewai[tools]>=0.85.0,<1.0.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<=3.13"
dependencies = [
"crewai[tools]>=0.83.0,<1.0.0",
"crewai[tools]>=0.85.0,<1.0.0",
]
[project.scripts]

View File

@@ -1,2 +0,0 @@
.env
__pycache__/

View File

@@ -1,57 +0,0 @@
# {{crew_name}} Crew
Welcome to the {{crew_name}} Crew project, powered by [crewAI](https://crewai.com). This template is designed to help you set up a multi-agent AI system with ease, leveraging the powerful and flexible framework provided by crewAI. Our goal is to enable your agents to collaborate effectively on complex tasks, maximizing their collective intelligence and capabilities.
## Installation
Ensure you have Python >=3.10 <=3.13 installed on your system. This project uses [Poetry](https://python-poetry.org/) for dependency management and package handling, offering a seamless setup and execution experience.
First, if you haven't already, install Poetry:
```bash
pip install poetry
```
Next, navigate to your project directory and install the dependencies:
1. First lock the dependencies and then install them:
```bash
crewai install
```
### Customizing
**Add your `OPENAI_API_KEY` into the `.env` file**
- Modify `src/{{folder_name}}/config/agents.yaml` to define your agents
- Modify `src/{{folder_name}}/config/tasks.yaml` to define your tasks
- Modify `src/{{folder_name}}/crew.py` to add your own logic, tools and specific args
- Modify `src/{{folder_name}}/main.py` to add custom inputs for your agents and tasks
## Running the Project
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project:
```bash
crewai run
```
This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration.
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.
## Understanding Your Crew
The {{name}} Crew is composed of multiple AI agents, each with unique roles, goals, and tools. These agents collaborate on a series of tasks, defined in `config/tasks.yaml`, leveraging their collective skills to achieve complex objectives. The `config/agents.yaml` file outlines the capabilities and configurations of each agent in your crew.
## Support
For support, questions, or feedback regarding the {{crew_name}} Crew or crewAI.
- Visit our [documentation](https://docs.crewai.com)
- Reach out to us through our [GitHub repository](https://github.com/joaomdmoura/crewai)
- [Join our Discord](https://discord.com/invite/X4JWnZnxPb)
- [Chat with our docs](https://chatg.pt/DWjSBZn)
Let's create wonders together with the power and simplicity of crewAI.

View File

@@ -1,19 +0,0 @@
researcher:
role: >
{topic} Senior Data Researcher
goal: >
Uncover cutting-edge developments in {topic}
backstory: >
You're a seasoned researcher with a knack for uncovering the latest
developments in {topic}. Known for your ability to find the most relevant
information and present it in a clear and concise manner.
reporting_analyst:
role: >
{topic} Reporting Analyst
goal: >
Create detailed reports based on {topic} data analysis and research findings
backstory: >
You're a meticulous analyst with a keen eye for detail. You're known for
your ability to turn complex data into clear and concise reports, making
it easy for others to understand and act on the information you provide.

View File

@@ -1,16 +0,0 @@
research_task:
description: >
Conduct a thorough research about {topic}
Make sure you find any interesting and relevant information given
the current year is 2024.
expected_output: >
A list with 10 bullet points of the most relevant information about {topic}
agent: researcher
reporting_task:
description: >
Review the context you got and expand each topic into a full section for a report.
Make sure the report is detailed and contains any and all relevant information.
expected_output: >
A fully fledge reports with a title, mains topics, each with a full section of information.
agent: reporting_analyst

View File

@@ -1,58 +0,0 @@
from pydantic import BaseModel
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
# Uncomment the following line to use an example of a custom tool
# from demo_pipeline.tools.custom_tool import MyCustomTool
# Check our tools documentations for more information on how to use them
# from crewai_tools import SerperDevTool
class ResearchReport(BaseModel):
"""Research Report"""
title: str
body: str
@CrewBase
class ResearchCrew():
"""Research Crew"""
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
verbose=True
)
@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'],
verbose=True
)
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
)
@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
output_pydantic=ResearchReport
)
@crew
def crew(self) -> Crew:
"""Creates the Research Crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
)

View File

@@ -1,51 +0,0 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
# Uncomment the following line to use an example of a custom tool
# from {{folder_name}}.tools.custom_tool import MyCustomTool
# Check our tools documentations for more information on how to use them
# from crewai_tools import SerperDevTool
@CrewBase
class WriteLinkedInCrew():
"""Research Crew"""
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
verbose=True
)
@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'],
verbose=True
)
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
)
@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
output_file='report.md'
)
@crew
def crew(self) -> Crew:
"""Creates the {{crew_name}} crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
)

View File

@@ -1,14 +0,0 @@
x_writer_agent:
role: >
Expert Social Media Content Creator specializing in short form written content
goal: >
Create viral-worthy, engaging short form posts that distill complex {topic} information
into compelling 280-character messages
backstory: >
You're a social media virtuoso with a particular talent for short form content. Your posts
consistently go viral due to your ability to craft hooks that stop users mid-scroll.
You've studied the techniques of social media masters like Justin Welsh, Dickie Bush,
Nicolas Cole, and Shaan Puri, incorporating their best practices into your own unique style.
Your superpower is taking intricate {topic} concepts and transforming them into
bite-sized, shareable content that resonates with a wide audience. You know exactly
how to structure a post for maximum impact and engagement.

View File

@@ -1,22 +0,0 @@
write_x_task:
description: >
Using the research report provided, create an engaging short form post about {topic}.
Your post should have a great hook, summarize key points, and be structured for easy
consumption on a digital platform. The post must be under 280 characters.
Follow these guidelines:
1. Start with an attention-grabbing hook
2. Condense the main insights from the research
3. Use clear, concise language
4. Include a call-to-action or thought-provoking question if space allows
5. Ensure the post flows well and is easy to read quickly
Here is the title of the research report you will be using
Title: {title}
Research:
{body}
expected_output: >
A compelling X post under 280 characters that effectively summarizes the key findings
about {topic}, starts with a strong hook, and is optimized for engagement on the platform.
agent: x_writer_agent

View File

@@ -1,36 +0,0 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
# Uncomment the following line to use an example of a custom tool
# from demo_pipeline.tools.custom_tool import MyCustomTool
# Check our tools documentations for more information on how to use them
# from crewai_tools import SerperDevTool
@CrewBase
class WriteXCrew:
"""Research Crew"""
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
@agent
def x_writer_agent(self) -> Agent:
return Agent(config=self.agents_config["x_writer_agent"], verbose=True)
@task
def write_x_task(self) -> Task:
return Task(
config=self.tasks_config["write_x_task"],
)
@crew
def crew(self) -> Crew:
"""Creates the Write X Crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
)

View File

@@ -1,26 +0,0 @@
#!/usr/bin/env python
import asyncio
from {{folder_name}}.pipelines.pipeline import {{pipeline_name}}Pipeline
async def run():
"""
Run the pipeline.
"""
inputs = [
{"topic": "AI wearables"},
]
pipeline = {{pipeline_name}}Pipeline()
results = await pipeline.kickoff(inputs)
# Process and print results
for result in results:
print(f"Raw output: {result.raw}")
if result.json_dict:
print(f"JSON output: {result.json_dict}")
print("\n")
def main():
asyncio.run(run())
if __name__ == "__main__":
main()

View File

@@ -1,87 +0,0 @@
"""
This pipeline file includes two different examples to demonstrate the flexibility of crewAI pipelines.
Example 1: Two-Stage Pipeline
-----------------------------
This pipeline consists of two crews:
1. ResearchCrew: Performs research on a given topic.
2. WriteXCrew: Generates an X (Twitter) post based on the research findings.
Key features:
- The ResearchCrew's final task uses output_json to store all research findings in a JSON object.
- This JSON object is then passed to the WriteXCrew, where tasks can access the research findings.
Example 2: Two-Stage Pipeline with Parallel Execution
-------------------------------------------------------
This pipeline consists of three crews:
1. ResearchCrew: Performs research on a given topic.
2. WriteXCrew and WriteLinkedInCrew: Run in parallel, using the research findings to generate posts for X and LinkedIn, respectively.
Key features:
- Demonstrates the ability to run multiple crews in parallel.
- Shows how to structure a pipeline with both sequential and parallel stages.
Usage:
- To switch between examples, comment/uncomment the respective code blocks below.
- Ensure that you have implemented all necessary crew classes (ResearchCrew, WriteXCrew, WriteLinkedInCrew) before running.
"""
# Common imports for both examples
from crewai import Pipeline
# Uncomment the crews you need for your chosen example
from ..crews.research_crew.research_crew import ResearchCrew
from ..crews.write_x_crew.write_x_crew import WriteXCrew
# from .crews.write_linkedin_crew.write_linkedin_crew import WriteLinkedInCrew # Uncomment for Example 2
# EXAMPLE 1: Two-Stage Pipeline
# -----------------------------
# Uncomment the following code block to use Example 1
class {{pipeline_name}}Pipeline:
def __init__(self):
# Initialize crews
self.research_crew = ResearchCrew().crew()
self.write_x_crew = WriteXCrew().crew()
def create_pipeline(self):
return Pipeline(
stages=[
self.research_crew,
self.write_x_crew
]
)
async def kickoff(self, inputs):
pipeline = self.create_pipeline()
results = await pipeline.kickoff(inputs)
return results
# EXAMPLE 2: Two-Stage Pipeline with Parallel Execution
# -------------------------------------------------------
# Uncomment the following code block to use Example 2
# @PipelineBase
# class {{pipeline_name}}Pipeline:
# def __init__(self):
# # Initialize crews
# self.research_crew = ResearchCrew().crew()
# self.write_x_crew = WriteXCrew().crew()
# self.write_linkedin_crew = WriteLinkedInCrew().crew()
# @pipeline
# def create_pipeline(self):
# return Pipeline(
# stages=[
# self.research_crew,
# [self.write_x_crew, self.write_linkedin_crew] # Parallel execution
# ]
# )
# async def run(self, inputs):
# pipeline = self.create_pipeline()
# results = await pipeline.kickoff(inputs)
# return results

View File

@@ -1,17 +0,0 @@
[tool.poetry]
name = "{{folder_name}}"
version = "0.1.0"
description = "{{name}} using crewAI"
authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = ">=0.83.0,<1.0.0" }
asyncio = "*"
[tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:main"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -1,19 +0,0 @@
from typing import Type
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class MyCustomToolInput(BaseModel):
"""Input schema for MyCustomTool."""
argument: str = Field(..., description="Description of the argument.")
class MyCustomTool(BaseTool):
name: str = "Name of my tool"
description: str = (
"Clear description for what this tool is useful for, you agent will need this information to use it."
)
args_schema: Type[BaseModel] = MyCustomToolInput
def _run(self, argument: str) -> str:
# Implementation goes here
return "this is an example of a tool output, ignore it and move along."

View File

@@ -1,2 +0,0 @@
.env
__pycache__/

View File

@@ -1,54 +0,0 @@
# {{crew_name}} Crew
Welcome to the {{crew_name}} Crew project, powered by [crewAI](https://crewai.com). This template is designed to help you set up a multi-agent AI system with ease, leveraging the powerful and flexible framework provided by crewAI. Our goal is to enable your agents to collaborate effectively on complex tasks, maximizing their collective intelligence and capabilities.
## Installation
Ensure you have Python >=3.10 <=3.13 installed on your system. This project uses [Poetry](https://python-poetry.org/) for dependency management and package handling, offering a seamless setup and execution experience.
First, if you haven't already, install Poetry:
```bash
pip install poetry
```
Next, navigate to your project directory and install the dependencies:
1. First lock the dependencies and then install them:
```bash
crewai install
```
### Customizing
**Add your `OPENAI_API_KEY` into the `.env` file**
- Modify `src/{{folder_name}}/config/agents.yaml` to define your agents
- Modify `src/{{folder_name}}/config/tasks.yaml` to define your tasks
- Modify `src/{{folder_name}}/crew.py` to add your own logic, tools and specific args
- Modify `src/{{folder_name}}/main.py` to add custom inputs for your agents and tasks
## Running the Project
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project:
```bash
crewai run
```
This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration.
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.
## Understanding Your Crew
The {{name}} Crew is composed of multiple AI agents, each with unique roles, goals, and tools. These agents collaborate on a series of tasks, defined in `config/tasks.yaml`, leveraging their collective skills to achieve complex objectives. The `config/agents.yaml` file outlines the capabilities and configurations of each agent in your crew.
## Support
For support, questions, or feedback regarding the {{crew_name}} Crew or crewAI.
- Visit our [documentation](https://docs.crewai.com)
- Reach out to us through our [GitHub repository](https://github.com/joaomdmoura/crewai)
- [Join our Discord](https://discord.com/invite/X4JWnZnxPb)
- [Chat with our docs](https://chatg.pt/DWjSBZn)
Let's create wonders together with the power and simplicity of crewAI.

View File

@@ -1,19 +0,0 @@
researcher:
role: >
{topic} Senior Data Researcher
goal: >
Uncover cutting-edge developments in {topic}
backstory: >
You're a seasoned researcher with a knack for uncovering the latest
developments in {topic}. Known for your ability to find the most relevant
information and present it in a clear and concise manner.
reporting_analyst:
role: >
{topic} Reporting Analyst
goal: >
Create detailed reports based on {topic} data analysis and research findings
backstory: >
You're a meticulous analyst with a keen eye for detail. You're known for
your ability to turn complex data into clear and concise reports, making
it easy for others to understand and act on the information you provide.

View File

@@ -1,17 +0,0 @@
research_task:
description: >
Conduct a thorough research about {topic}
Make sure you find any interesting and relevant information given
the current year is 2024.
expected_output: >
A list with 10 bullet points of the most relevant information about {topic}
agent: researcher
reporting_task:
description: >
Review the context you got and expand each topic into a full section for a report.
Make sure the report is detailed and contains any and all relevant information.
expected_output: >
A fully fledge reports with the mains topics, each with a full section of information.
Formatted as markdown without '```'
agent: reporting_analyst

View File

@@ -1,40 +0,0 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from pydantic import BaseModel
# Uncomment the following line to use an example of a custom tool
# from demo_pipeline.tools.custom_tool import MyCustomTool
# Check our tools documentations for more information on how to use them
# from crewai_tools import SerperDevTool
class UrgencyScore(BaseModel):
urgency_score: int
@CrewBase
class ClassifierCrew:
"""Email Classifier Crew"""
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
@agent
def classifier(self) -> Agent:
return Agent(config=self.agents_config["classifier"], verbose=True)
@task
def urgent_task(self) -> Task:
return Task(
config=self.tasks_config["classify_email"],
output_pydantic=UrgencyScore,
)
@crew
def crew(self) -> Crew:
"""Creates the Email Classifier Crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
)

View File

@@ -1,7 +0,0 @@
classifier:
role: >
Email Classifier
goal: >
Classify the email: {email} as urgent or normal from a score of 1 to 10, where 1 is not urgent and 10 is urgent. Return the urgency score only.`
backstory: >
You are a highly efficient and experienced email classifier, trained to quickly assess and classify emails. Your ability to remain calm under pressure and provide concise, actionable responses has made you an invaluable asset in managing normal situations and maintaining smooth operations.

View File

@@ -1,7 +0,0 @@
classify_email:
description: >
Classify the email: {email}
as urgent or normal.
expected_output: >
Classify the email from a scale of 1 to 10, where 1 is not urgent and 10 is urgent. Return the urgency score only.
agent: classifier

View File

@@ -1,7 +0,0 @@
normal_handler:
role: >
Normal Email Processor
goal: >
Process normal emails and create an email to respond to the sender.
backstory: >
You are a highly efficient and experienced normal email handler, trained to quickly assess and respond to normal communications. Your ability to remain calm under pressure and provide concise, actionable responses has made you an invaluable asset in managing normal situations and maintaining smooth operations.

View File

@@ -1,6 +0,0 @@
normal_task:
description: >
Process and respond to normal email quickly.
expected_output: >
An email response to the normal email.
agent: normal_handler

View File

@@ -1,36 +0,0 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
# Uncomment the following line to use an example of a custom tool
# from demo_pipeline.tools.custom_tool import MyCustomTool
# Check our tools documentations for more information on how to use them
# from crewai_tools import SerperDevTool
@CrewBase
class NormalCrew:
"""Normal Email Crew"""
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
@agent
def normal_handler(self) -> Agent:
return Agent(config=self.agents_config["normal_handler"], verbose=True)
@task
def urgent_task(self) -> Task:
return Task(
config=self.tasks_config["normal_task"],
)
@crew
def crew(self) -> Crew:
"""Creates the Normal Email Crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
)

View File

@@ -1,7 +0,0 @@
urgent_handler:
role: >
Urgent Email Processor
goal: >
Process urgent emails and create an email to respond to the sender.
backstory: >
You are a highly efficient and experienced urgent email handler, trained to quickly assess and respond to time-sensitive communications. Your ability to remain calm under pressure and provide concise, actionable responses has made you an invaluable asset in managing critical situations and maintaining smooth operations.

View File

@@ -1,6 +0,0 @@
urgent_task:
description: >
Process and respond to urgent email quickly.
expected_output: >
An email response to the urgent email.
agent: urgent_handler

View File

@@ -1,36 +0,0 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
# Uncomment the following line to use an example of a custom tool
# from demo_pipeline.tools.custom_tool import MyCustomTool
# Check our tools documentations for more information on how to use them
# from crewai_tools import SerperDevTool
@CrewBase
class UrgentCrew:
"""Urgent Email Crew"""
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
@agent
def urgent_handler(self) -> Agent:
return Agent(config=self.agents_config["urgent_handler"], verbose=True)
@task
def urgent_task(self) -> Task:
return Task(
config=self.tasks_config["urgent_task"],
)
@crew
def crew(self) -> Crew:
"""Creates the Urgent Email Crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
)

View File

@@ -1,75 +0,0 @@
#!/usr/bin/env python
import asyncio
from crewai.routers.router import Route
from crewai.routers.router import Router
from {{folder_name}}.pipelines.pipeline_classifier import EmailClassifierPipeline
from {{folder_name}}.pipelines.pipeline_normal import NormalPipeline
from {{folder_name}}.pipelines.pipeline_urgent import UrgentPipeline
async def run():
"""
Run the pipeline.
"""
inputs = [
{
"email": """
Subject: URGENT: Marketing Campaign Launch - Immediate Action Required
Dear Team,
I'm reaching out regarding our upcoming marketing campaign that requires your immediate attention and swift action. We're facing a critical deadline, and our success hinges on our ability to mobilize quickly.
Key points:
Campaign launch: 48 hours from now
Target audience: 250,000 potential customers
Expected ROI: 35% increase in Q3 sales
What we need from you NOW:
Final approval on creative assets (due in 3 hours)
Confirmation of media placements (due by end of day)
Last-minute budget allocation for paid social media push
Our competitors are poised to launch similar campaigns, and we must act fast to maintain our market advantage. Delays could result in significant lost opportunities and potential revenue.
Please prioritize this campaign above all other tasks. I'll be available for the next 24 hours to address any concerns or roadblocks.
Let's make this happen!
[Your Name]
Marketing Director
P.S. I'll be scheduling an emergency team meeting in 1 hour to discuss our action plan. Attendance is mandatory.
"""
}
]
pipeline_classifier = EmailClassifierPipeline().create_pipeline()
pipeline_urgent = UrgentPipeline().create_pipeline()
pipeline_normal = NormalPipeline().create_pipeline()
router = Router(
routes={
"high_urgency": Route(
condition=lambda x: x.get("urgency_score", 0) > 7,
pipeline=pipeline_urgent
),
"low_urgency": Route(
condition=lambda x: x.get("urgency_score", 0) <= 7,
pipeline=pipeline_normal
)
},
default=pipeline_normal
)
pipeline = pipeline_classifier >> router
results = await pipeline.kickoff(inputs)
# Process and print results
for result in results:
print(f"Raw output: {result.raw}")
if result.json_dict:
print(f"JSON output: {result.json_dict}")
print("\n")
def main():
asyncio.run(run())
if __name__ == "__main__":
main()

View File

@@ -1,24 +0,0 @@
from crewai import Pipeline
from crewai.project import PipelineBase
from ..crews.classifier_crew.classifier_crew import ClassifierCrew
@PipelineBase
class EmailClassifierPipeline:
def __init__(self):
# Initialize crews
self.classifier_crew = ClassifierCrew().crew()
def create_pipeline(self):
return Pipeline(
stages=[
self.classifier_crew
]
)
async def kickoff(self, inputs):
pipeline = self.create_pipeline()
results = await pipeline.kickoff(inputs)
return results

View File

@@ -1,24 +0,0 @@
from crewai import Pipeline
from crewai.project import PipelineBase
from ..crews.normal_crew.normal_crew import NormalCrew
@PipelineBase
class NormalPipeline:
def __init__(self):
# Initialize crews
self.normal_crew = NormalCrew().crew()
def create_pipeline(self):
return Pipeline(
stages=[
self.normal_crew
]
)
async def kickoff(self, inputs):
pipeline = self.create_pipeline()
results = await pipeline.kickoff(inputs)
return results

View File

@@ -1,23 +0,0 @@
from crewai import Pipeline
from crewai.project import PipelineBase
from ..crews.urgent_crew.urgent_crew import UrgentCrew
@PipelineBase
class UrgentPipeline:
def __init__(self):
# Initialize crews
self.urgent_crew = UrgentCrew().crew()
def create_pipeline(self):
return Pipeline(
stages=[
self.urgent_crew
]
)
async def kickoff(self, inputs):
pipeline = self.create_pipeline()
results = await pipeline.kickoff(inputs)
return results

View File

@@ -1,21 +0,0 @@
[project]
name = "{{folder_name}}"
version = "0.1.0"
description = "{{name}} using crewAI"
authors = ["Your Name <you@example.com>"]
requires-python = ">=3.10,<=3.13"
dependencies = [
"crewai[tools]>=0.83.0,<1.0.0"
]
[project.scripts]
{{folder_name}} = "{{folder_name}}.main:main"
run_crew = "{{folder_name}}.main:main"
train = "{{folder_name}}.main:train"
replay = "{{folder_name}}.main:replay"
test = "{{folder_name}}.main:test"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

View File

@@ -1,19 +0,0 @@
from typing import Type
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class MyCustomToolInput(BaseModel):
"""Input schema for MyCustomTool."""
argument: str = Field(..., description="Description of the argument.")
class MyCustomTool(BaseTool):
name: str = "Name of my tool"
description: str = (
"Clear description for what this tool is useful for, you agent will need this information to use it."
)
args_schema: Type[BaseModel] = MyCustomToolInput
def _run(self, argument: str) -> str:
# Implementation goes here
return "this is an example of a tool output, ignore it and move along."

View File

@@ -5,6 +5,6 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<=3.13"
dependencies = [
"crewai[tools]>=0.83.0"
"crewai[tools]>=0.85.0"
]

View File

@@ -5,7 +5,7 @@ import uuid
import warnings
from concurrent.futures import Future
from hashlib import md5
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from pydantic import (
UUID4,
@@ -23,12 +23,12 @@ from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache import CacheHandler
from crewai.crews.crew_output import CrewOutput
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.llm import LLM
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.memory.user.user_memory import UserMemory
from crewai.process import Process
from crewai.task import Task
@@ -56,8 +56,6 @@ if os.environ.get("AGENTOPS_API_KEY"):
except ImportError:
pass
if TYPE_CHECKING:
from crewai.pipeline.pipeline import Pipeline
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
@@ -1073,17 +1071,5 @@ class Crew(BaseModel):
evaluator.print_crew_evaluation_result()
def __rshift__(self, other: "Crew") -> "Pipeline":
"""
Implements the >> operator to add another Crew to an existing Pipeline.
"""
from crewai.pipeline.pipeline import Pipeline
if not isinstance(other, Crew):
raise TypeError(
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
)
return Pipeline(stages=[self, other])
def __repr__(self):
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"

View File

@@ -1,36 +1,72 @@
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Union, List
from typing import Union, List, Dict, Any
from pydantic import Field
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from typing import Dict, Any
from crewai.utilities.logger import Logger
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
class BaseFileKnowledgeSource(BaseKnowledgeSource):
class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
"""Base class for knowledge sources that load content from files."""
file_path: Union[Path, List[Path]] = Field(...)
_logger: Logger = Logger(verbose=True)
file_path: Union[Path, List[Path], str, List[str]] = Field(
..., description="The path to the file"
)
content: Dict[Path, str] = Field(init=False, default_factory=dict)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
safe_file_paths: List[Path] = Field(default_factory=list)
def model_post_init(self, _):
"""Post-initialization method to load content."""
self.safe_file_paths = self._process_file_paths()
self.validate_paths()
self.content = self.load_content()
@abstractmethod
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess file content. Should be overridden by subclasses."""
paths = [self.file_path] if isinstance(self.file_path, Path) else self.file_path
"""Load and preprocess file content. Should be overridden by subclasses. Assume that the file path is relative to the project root in the knowledge directory."""
pass
for path in paths:
def validate_paths(self):
"""Validate the paths."""
for path in self.safe_file_paths:
if not path.exists():
self._logger.log(
"error",
f"File not found: {path}. Try adding sources to the knowledge directory. If its inside the knowledge directory, use the relative path.",
color="red",
)
raise FileNotFoundError(f"File not found: {path}")
if not path.is_file():
raise ValueError(f"Path is not a file: {path}")
return {}
self._logger.log(
"error",
f"Path is not a file: {path}",
color="red",
)
def save_documents(self, metadata: Dict[str, Any]):
"""Save the documents to the storage."""
chunk_metadatas = [metadata.copy() for _ in self.chunks]
self.storage.save(self.chunks, chunk_metadatas)
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
def _process_file_paths(self) -> List[Path]:
"""Convert file_path to a list of Path objects."""
paths = (
[self.file_path]
if isinstance(self.file_path, (str, Path))
else self.file_path
)
if not isinstance(paths, list):
raise ValueError("file_path must be a Path, str, or a list of these types")
return [self.convert_to_path(path) for path in paths]

View File

@@ -10,19 +10,15 @@ class CSVKnowledgeSource(BaseFileKnowledgeSource):
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess CSV file content."""
super().load_content() # Validate the file path
file_path = (
self.file_path[0] if isinstance(self.file_path, list) else self.file_path
)
file_path = Path(file_path) if isinstance(file_path, str) else file_path
with open(file_path, "r", encoding="utf-8") as csvfile:
reader = csv.reader(csvfile)
content = ""
for row in reader:
content += " ".join(row) + "\n"
return {file_path: content}
content_dict = {}
for file_path in self.safe_file_paths:
with open(file_path, "r", encoding="utf-8") as csvfile:
reader = csv.reader(csvfile)
content = ""
for row in reader:
content += " ".join(row) + "\n"
content_dict[file_path] = content
return content_dict
def add(self) -> None:
"""

View File

@@ -8,17 +8,15 @@ class ExcelKnowledgeSource(BaseFileKnowledgeSource):
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess Excel file content."""
super().load_content() # Validate the file path
pd = self._import_dependencies()
if isinstance(self.file_path, list):
file_path = self.file_path[0]
else:
file_path = self.file_path
df = pd.read_excel(file_path)
content = df.to_csv(index=False)
return {file_path: content}
content_dict = {}
for file_path in self.safe_file_paths:
file_path = self.convert_to_path(file_path)
df = pd.read_excel(file_path)
content = df.to_csv(index=False)
content_dict[file_path] = content
return content_dict
def _import_dependencies(self):
"""Dynamically import dependencies."""

View File

@@ -10,11 +10,9 @@ class JSONKnowledgeSource(BaseFileKnowledgeSource):
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess JSON file content."""
super().load_content() # Validate the file path
paths = [self.file_path] if isinstance(self.file_path, Path) else self.file_path
content: Dict[Path, str] = {}
for path in paths:
for path in self.safe_file_paths:
path = self.convert_to_path(path)
with open(path, "r", encoding="utf-8") as json_file:
data = json.load(json_file)
content[path] = self._json_to_text(data)

View File

@@ -9,14 +9,13 @@ class PDFKnowledgeSource(BaseFileKnowledgeSource):
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess PDF file content."""
super().load_content() # Validate the file paths
pdfplumber = self._import_pdfplumber()
paths = [self.file_path] if isinstance(self.file_path, Path) else self.file_path
content = {}
for path in paths:
for path in self.safe_file_paths:
text = ""
path = self.convert_to_path(path)
with pdfplumber.open(path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()

View File

@@ -9,12 +9,11 @@ class TextFileKnowledgeSource(BaseFileKnowledgeSource):
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess text file content."""
super().load_content()
paths = [self.file_path] if isinstance(self.file_path, Path) else self.file_path
content = {}
for path in paths:
with path.open("r", encoding="utf-8") as f:
content[path] = f.read() # type: ignore
for path in self.safe_file_paths:
path = self.convert_to_path(path)
with open(path, "r", encoding="utf-8") as f:
content[path] = f.read()
return content
def add(self) -> None:

View File

@@ -10,7 +10,7 @@ class EntityMemory(Memory):
Inherits from the Memory class.
"""
def __init__(self, crew=None, embedder_config=None, storage=None):
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if hasattr(crew, "memory_config") and crew.memory_config is not None:
self.memory_provider = crew.memory_config.get("provider")
else:
@@ -33,6 +33,7 @@ class EntityMemory(Memory):
allow_reset=True,
embedder_config=embedder_config,
crew=crew,
path=path,
)
)
super().__init__(storage)

View File

@@ -14,8 +14,9 @@ class LongTermMemory(Memory):
LongTermMemoryItem instances.
"""
def __init__(self, storage=None):
storage = storage if storage else LTMSQLiteStorage()
def __init__(self, storage=None, path=None):
if not storage:
storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
super().__init__(storage)
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"

View File

@@ -13,7 +13,7 @@ class ShortTermMemory(Memory):
MemoryItem instances.
"""
def __init__(self, crew=None, embedder_config=None, storage=None):
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if hasattr(crew, "memory_config") and crew.memory_config is not None:
self.memory_provider = crew.memory_config.get("provider")
else:
@@ -32,7 +32,7 @@ class ShortTermMemory(Memory):
storage
if storage
else RAGStorage(
type="short_term", embedder_config=embedder_config, crew=crew
type="short_term", embedder_config=embedder_config, crew=crew, path=path
)
)
super().__init__(storage)

View File

@@ -37,7 +37,7 @@ class RAGStorage(BaseRAGStorage):
app: ClientAPI | None = None
def __init__(self, type, allow_reset=True, embedder_config=None, crew=None):
def __init__(self, type, allow_reset=True, embedder_config=None, crew=None, path=None):
super().__init__(type, allow_reset, embedder_config, crew)
agents = crew.agents if crew else []
agents = [self._sanitize_role(agent.role) for agent in agents]
@@ -47,6 +47,7 @@ class RAGStorage(BaseRAGStorage):
self.type = type
self.allow_reset = allow_reset
self.path = path
self._initialize_app()
def _set_embedder_config(self):
@@ -59,7 +60,7 @@ class RAGStorage(BaseRAGStorage):
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=f"{db_storage_path()}/{self.type}/{self.agents}",
path=self.path if self.path else f"{db_storage_path()}/{self.type}/{self.agents}",
settings=Settings(allow_reset=self.allow_reset),
)

View File

@@ -1,5 +0,0 @@
from crewai.pipeline.pipeline import Pipeline
from crewai.pipeline.pipeline_kickoff_result import PipelineKickoffResult
from crewai.pipeline.pipeline_output import PipelineOutput
__all__ = ["Pipeline", "PipelineKickoffResult", "PipelineOutput"]

View File

@@ -1,405 +0,0 @@
import asyncio
import copy
from typing import Any, Dict, List, Tuple, Union
from pydantic import BaseModel, Field, model_validator
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline_kickoff_result import PipelineKickoffResult
from crewai.routers.router import Router
from crewai.types.usage_metrics import UsageMetrics
Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]]
PipelineStage = Union[Crew, List[Crew], Router]
"""
Developer Notes:
This module defines a Pipeline class that represents a sequence of operations (stages)
to process inputs. Each stage can be either sequential or parallel, and the pipeline
can process multiple kickoffs concurrently.
Core Loop Explanation:
1. The `process_kickoffs` method processes multiple kickoffs in parallel, each going through
all pipeline stages.
2. The `process_single_kickoff` method handles the processing of a single kickouff through
all stages, updating metrics and input data along the way.
3. The `_process_stage` method determines whether a stage is sequential or parallel
and processes it accordingly.
4. The `_process_single_crew` and `_process_parallel_crews` methods handle the
execution of single and parallel crew stages.
5. The `_update_metrics_and_input` method updates usage metrics and the current input
with the outputs from a stage.
6. The `_build_pipeline_kickoff_results` method constructs the final results of the
pipeline kickoff, including traces and outputs.
Handling Traces and Crew Outputs:
- During the processing of stages, we handle the results (traces and crew outputs)
for all stages except the last one differently from the final stage.
- For intermediate stages, the primary focus is on passing the input data between stages.
This involves merging the output dictionaries from all crews in a stage into a single
dictionary and passing it to the next stage. This merged dictionary allows for smooth
data flow between stages.
- For the final stage, in addition to passing the input data, we also need to prepare
the final outputs and traces to be returned as the overall result of the pipeline kickoff.
In this case, we do not merge the results, as each result needs to be included
separately in its own pipeline kickoff result.
Pipeline Terminology:
- Pipeline: The overall structure that defines a sequence of operations.
- Stage: A distinct part of the pipeline, which can be either sequential or parallel.
- Kickoff: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
- Branch: Parallel executions within a stage (e.g., concurrent crew operations).
- Trace: The journey of an individual input through the entire pipeline.
Example pipeline structure:
crew1 >> crew2 >> crew3
This represents a pipeline with three sequential stages:
1. crew1 is the first stage, which processes the input and passes its output to crew2.
2. crew2 is the second stage, which takes the output from crew1 as its input, processes it, and passes its output to crew3.
3. crew3 is the final stage, which takes the output from crew2 as its input and produces the final output of the pipeline.
Each input creates its own kickoff, flowing through all stages of the pipeline.
Multiple kickoffss can be processed concurrently, each following the defined pipeline structure.
Another example pipeline structure:
crew1 >> [crew2, crew3] >> crew4
This represents a pipeline with three stages:
1. A sequential stage (crew1)
2. A parallel stage with two branches (crew2 and crew3 executing concurrently)
3. Another sequential stage (crew4)
Each input creates its own kickoff, flowing through all stages of the pipeline.
Multiple kickoffs can be processed concurrently, each following the defined pipeline structure.
"""
class Pipeline(BaseModel):
stages: List[PipelineStage] = Field(
..., description="List of crews representing stages to be executed in sequence"
)
@model_validator(mode="before")
@classmethod
def validate_stages(cls, values):
stages = values.get("stages", [])
def check_nesting_and_type(item, depth=0):
if depth > 1:
raise ValueError("Double nesting is not allowed in pipeline stages")
if isinstance(item, list):
for sub_item in item:
check_nesting_and_type(sub_item, depth + 1)
elif not isinstance(item, (Crew, Router)):
raise ValueError(
f"Expected Crew instance, Router instance, or list of Crews, got {type(item)}"
)
for stage in stages:
check_nesting_and_type(stage)
return values
async def kickoff(
self, inputs: List[Dict[str, Any]]
) -> List[PipelineKickoffResult]:
"""
Processes multiple runs in parallel, each going through all pipeline stages.
Args:
inputs (List[Dict[str, Any]]): List of inputs for each run.
Returns:
List[PipelineKickoffResult]: List of results from each run.
"""
pipeline_results: List[PipelineKickoffResult] = []
# Process all runs in parallel
all_run_results = await asyncio.gather(
*(self.process_single_kickoff(input_data) for input_data in inputs)
)
# Flatten the list of lists into a single list of results
pipeline_results.extend(
result for run_result in all_run_results for result in run_result
)
return pipeline_results
async def process_single_kickoff(
self, kickoff_input: Dict[str, Any]
) -> List[PipelineKickoffResult]:
"""
Processes a single run through all pipeline stages.
Args:
input (Dict[str, Any]): The input for the run.
Returns:
List[PipelineKickoffResult]: The results of processing the run.
"""
initial_input = copy.deepcopy(kickoff_input)
current_input = copy.deepcopy(kickoff_input)
stages = self._copy_stages()
pipeline_usage_metrics: Dict[str, UsageMetrics] = {}
all_stage_outputs: List[List[CrewOutput]] = []
traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]]
stage_index = 0
while stage_index < len(stages):
stage = stages[stage_index]
stage_input = copy.deepcopy(current_input)
if isinstance(stage, Router):
next_pipeline, route_taken = stage.route(stage_input)
stages = (
stages[: stage_index + 1]
+ list(next_pipeline.stages)
+ stages[stage_index + 1 :]
)
traces.append([{"route_taken": route_taken}])
stage_index += 1
continue
stage_outputs, stage_trace = await self._process_stage(stage, stage_input)
self._update_metrics_and_input(
pipeline_usage_metrics, current_input, stage, stage_outputs
)
traces.append(stage_trace)
all_stage_outputs.append(stage_outputs)
stage_index += 1
return self._build_pipeline_kickoff_results(
all_stage_outputs, traces, pipeline_usage_metrics
)
async def _process_stage(
self, stage: PipelineStage, current_input: Dict[str, Any]
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
"""
Processes a single stage of the pipeline, which can be either sequential or parallel.
Args:
stage (Union[Crew, List[Crew]]): The stage to process.
current_input (Dict[str, Any]): The input for the stage.
Returns:
Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]: The outputs and trace of the stage.
"""
if isinstance(stage, Crew):
return await self._process_single_crew(stage, current_input)
elif isinstance(stage, list) and all(isinstance(crew, Crew) for crew in stage):
return await self._process_parallel_crews(stage, current_input)
else:
raise ValueError(f"Unsupported stage type: {type(stage)}")
async def _process_single_crew(
self, crew: Crew, current_input: Dict[str, Any]
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
"""
Processes a single crew.
Args:
crew (Crew): The crew to process.
current_input (Dict[str, Any]): The input for the crew.
Returns:
Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]: The output and trace of the crew.
"""
output = await crew.kickoff_async(inputs=current_input)
return [output], [crew.name or str(crew.id)]
async def _process_parallel_crews(
self, crews: List[Crew], current_input: Dict[str, Any]
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
"""
Processes multiple crews in parallel.
Args:
crews (List[Crew]): The list of crews to process in parallel.
current_input (Dict[str, Any]): The input for the crews.
Returns:
Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]: The outputs and traces of the crews.
"""
parallel_outputs = await asyncio.gather(
*[crew.kickoff_async(inputs=current_input) for crew in crews]
)
return parallel_outputs, [crew.name or str(crew.id) for crew in crews]
def _update_metrics_and_input(
self,
usage_metrics: Dict[str, UsageMetrics],
current_input: Dict[str, Any],
stage: PipelineStage,
outputs: List[CrewOutput],
) -> None:
"""
Updates metrics and current input with the outputs of a stage.
Args:
usage_metrics (Dict[str, Any]): The usage metrics to update.
current_input (Dict[str, Any]): The current input to update.
stage (Union[Crew, List[Crew]]): The stage that was processed.
outputs (List[CrewOutput]): The outputs of the stage.
"""
if isinstance(stage, Crew):
usage_metrics[stage.name or str(stage.id)] = outputs[0].token_usage
current_input.update(outputs[0].to_dict())
elif isinstance(stage, list) and all(isinstance(crew, Crew) for crew in stage):
for crew, output in zip(stage, outputs):
usage_metrics[crew.name or str(crew.id)] = output.token_usage
current_input.update(output.to_dict())
else:
raise ValueError(f"Unsupported stage type: {type(stage)}")
def _build_pipeline_kickoff_results(
self,
all_stage_outputs: List[List[CrewOutput]],
traces: List[List[Union[str, Dict[str, Any]]]],
token_usage: Dict[str, UsageMetrics],
) -> List[PipelineKickoffResult]:
"""
Builds the results of a pipeline run.
Args:
all_stage_outputs (List[List[CrewOutput]]): All stage outputs.
traces (List[List[Union[str, Dict[str, Any]]]]): All traces.
token_usage (Dict[str, Any]): Token usage metrics.
Returns:
List[PipelineKickoffResult]: The results of the pipeline run.
"""
formatted_traces = self._format_traces(traces)
formatted_crew_outputs = self._format_crew_outputs(all_stage_outputs)
return [
PipelineKickoffResult(
token_usage=token_usage,
trace=formatted_trace,
raw=crews_outputs[-1].raw,
pydantic=crews_outputs[-1].pydantic,
json_dict=crews_outputs[-1].json_dict,
crews_outputs=crews_outputs,
)
for crews_outputs, formatted_trace in zip(
formatted_crew_outputs, formatted_traces
)
]
def _format_traces(
self, traces: List[List[Union[str, Dict[str, Any]]]]
) -> List[List[Trace]]:
"""
Formats the traces of a pipeline run.
Args:
traces (List[List[Union[str, Dict[str, Any]]]]): The traces to format.
Returns:
List[List[Trace]]: The formatted traces.
"""
formatted_traces: List[Trace] = self._format_single_trace(traces[:-1])
return self._format_multiple_traces(formatted_traces, traces[-1])
def _format_single_trace(
self, traces: List[List[Union[str, Dict[str, Any]]]]
) -> List[Trace]:
"""
Formats single traces.
Args:
traces (List[List[Union[str, Dict[str, Any]]]]): The traces to format.
Returns:
List[Trace]: The formatted single traces.
"""
formatted_traces: List[Trace] = []
for trace in traces:
formatted_traces.append(trace[0] if len(trace) == 1 else trace)
return formatted_traces
def _format_multiple_traces(
self,
formatted_traces: List[Trace],
final_trace: List[Union[str, Dict[str, Any]]],
) -> List[List[Trace]]:
"""
Formats multiple traces.
Args:
formatted_traces (List[Trace]): The formatted single traces.
final_trace (List[Union[str, Dict[str, Any]]]): The final trace to format.
Returns:
List[List[Trace]]: The formatted multiple traces.
"""
traces_to_return: List[List[Trace]] = []
if len(final_trace) == 1:
formatted_traces.append(final_trace[0])
traces_to_return.append(formatted_traces)
else:
for trace in final_trace:
copied_traces = formatted_traces.copy()
copied_traces.append(trace)
traces_to_return.append(copied_traces)
return traces_to_return
def _format_crew_outputs(
self, all_stage_outputs: List[List[CrewOutput]]
) -> List[List[CrewOutput]]:
"""
Formats the outputs of all stages into a list of crew outputs.
Args:
all_stage_outputs (List[List[CrewOutput]]): All stage outputs.
Returns:
List[List[CrewOutput]]: Formatted crew outputs.
"""
crew_outputs: List[CrewOutput] = [
output
for stage_outputs in all_stage_outputs[:-1]
for output in stage_outputs
]
return [crew_outputs + [output] for output in all_stage_outputs[-1]]
def _copy_stages(self):
"""Create a deep copy of the Pipeline's stages."""
new_stages = []
for stage in self.stages:
if isinstance(stage, list):
new_stages.append(
[
crew.copy() if hasattr(crew, "copy") else copy.deepcopy(crew)
for crew in stage
]
)
elif hasattr(stage, "copy"):
new_stages.append(stage.copy())
else:
new_stages.append(copy.deepcopy(stage))
return new_stages
def __rshift__(self, other: PipelineStage) -> "Pipeline":
"""
Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline.
Args:
other (Any): The stage to add.
Returns:
Pipeline: A new pipeline with the added stage.
"""
if isinstance(other, (Crew, Router)) or (
isinstance(other, list) and all(isinstance(item, Crew) for item in other)
):
return type(self)(stages=self.stages + [other])
else:
raise TypeError(
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
)

View File

@@ -1,61 +0,0 @@
import json
import uuid
from typing import Any, Dict, List, Optional, Union
from pydantic import UUID4, BaseModel, Field
from crewai.crews.crew_output import CrewOutput
from crewai.types.usage_metrics import UsageMetrics
class PipelineKickoffResult(BaseModel):
"""Class that represents the result of a pipeline run."""
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
raw: str = Field(description="Raw output of the pipeline run", default="")
pydantic: Any = Field(
description="Pydantic output of the pipeline run", default=None
)
json_dict: Union[Dict[str, Any], None] = Field(
description="JSON dict output of the pipeline run", default={}
)
token_usage: Dict[str, UsageMetrics] = Field(
description="Token usage for each crew in the run"
)
trace: List[Any] = Field(
description="Trace of the journey of inputs through the run"
)
crews_outputs: List[CrewOutput] = Field(
description="Output from each crew in the run",
default=[],
)
@property
def json(self) -> Optional[str]:
if self.crews_outputs[-1].tasks_output[-1].output_format != "json":
raise ValueError(
"No JSON output found in the final task of the final crew. Please make sure to set the output_json property in the final task in your crew."
)
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""
output_dict = {}
if self.json_dict:
output_dict.update(self.json_dict)
elif self.pydantic:
output_dict.update(self.pydantic.model_dump())
return output_dict
def __str__(self):
if self.pydantic:
return str(self.pydantic)
if self.json_dict:
return str(self.json_dict)
return self.raw

View File

@@ -1,20 +0,0 @@
import uuid
from typing import List
from pydantic import UUID4, BaseModel, Field
from crewai.pipeline.pipeline_kickoff_result import PipelineKickoffResult
class PipelineOutput(BaseModel):
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
run_results: List[PipelineKickoffResult] = Field(
description="List of results for each run through the pipeline", default=[]
)
def add_run_result(self, result: PipelineKickoffResult):
self.run_results.append(result)

View File

@@ -8,12 +8,10 @@ from .annotations import (
llm,
output_json,
output_pydantic,
pipeline,
task,
tool,
)
from .crew_base import CrewBase
from .pipeline_base import PipelineBase
__all__ = [
"agent",
@@ -24,10 +22,8 @@ __all__ = [
"tool",
"callback",
"CrewBase",
"PipelineBase",
"llm",
"cache_handler",
"pipeline",
"before_kickoff",
"after_kickoff",
]

View File

@@ -65,21 +65,6 @@ def cache_handler(func):
return memoize(func)
def stage(func):
func.is_stage = True
return memoize(func)
def router(func):
func.is_router = True
return memoize(func)
def pipeline(func):
func.is_pipeline = True
return memoize(func)
def crew(func) -> Callable[..., Crew]:
def wrapper(self, *args, **kwargs) -> Crew:
instantiated_tasks = []

View File

@@ -1,58 +0,0 @@
from typing import Any, Callable, Dict, List, Type, Union
from crewai.crew import Crew
from crewai.pipeline.pipeline import Pipeline
from crewai.routers.router import Router
PipelineStage = Union[Crew, List[Crew], Router]
# TODO: Could potentially remove. Need to check with @joao and @gui if this is needed for CrewAI+
def PipelineBase(cls: Type[Any]) -> Type[Any]:
class WrappedClass(cls):
is_pipeline_class: bool = True # type: ignore
stages: List[PipelineStage]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.stages = []
self._map_pipeline_components()
def _get_all_functions(self) -> Dict[str, Callable[..., Any]]:
return {
name: getattr(self, name)
for name in dir(self)
if callable(getattr(self, name))
}
def _filter_functions(
self, functions: Dict[str, Callable[..., Any]], attribute: str
) -> Dict[str, Callable[..., Any]]:
return {
name: func
for name, func in functions.items()
if hasattr(func, attribute)
}
def _map_pipeline_components(self) -> None:
all_functions = self._get_all_functions()
crew_functions = self._filter_functions(all_functions, "is_crew")
router_functions = self._filter_functions(all_functions, "is_router")
for stage_attr in dir(self):
stage = getattr(self, stage_attr)
if isinstance(stage, (Crew, Router)):
self.stages.append(stage)
elif callable(stage) and hasattr(stage, "is_crew"):
self.stages.append(crew_functions[stage_attr]())
elif callable(stage) and hasattr(stage, "is_router"):
self.stages.append(router_functions[stage_attr]())
elif isinstance(stage, list) and all(
isinstance(item, Crew) for item in stage
):
self.stages.append(stage)
def build_pipeline(self) -> Pipeline:
return Pipeline(stages=self.stages)
return WrappedClass

View File

@@ -1,3 +0,0 @@
from crewai.routers.router import Router
__all__ = ["Router"]

View File

@@ -1,84 +0,0 @@
from copy import deepcopy
from typing import Any, Callable, Dict, Tuple
from pydantic import BaseModel, Field, PrivateAttr
class Route(BaseModel):
condition: Callable[[Dict[str, Any]], bool]
pipeline: Any
class Router(BaseModel):
routes: Dict[str, Route] = Field(
default_factory=dict,
description="Dictionary of route names to (condition, pipeline) tuples",
)
default: Any = Field(..., description="Default pipeline if no conditions are met")
_route_types: Dict[str, type] = PrivateAttr(default_factory=dict)
class Config:
arbitrary_types_allowed = True
def __init__(self, routes: Dict[str, Route], default: Any, **data):
super().__init__(routes=routes, default=default, **data)
self._check_copyable(default)
for name, route in routes.items():
self._check_copyable(route.pipeline)
self._route_types[name] = type(route.pipeline)
@staticmethod
def _check_copyable(obj: Any) -> None:
if not hasattr(obj, "copy") or not callable(getattr(obj, "copy")):
raise ValueError(f"Object of type {type(obj)} must have a 'copy' method")
def add_route(
self,
name: str,
condition: Callable[[Dict[str, Any]], bool],
pipeline: Any,
) -> "Router":
"""
Add a named route with its condition and corresponding pipeline to the router.
Args:
name: A unique name for this route
condition: A function that takes a dictionary input and returns a boolean
pipeline: The Pipeline to execute if the condition is met
Returns:
The Router instance for method chaining
"""
self._check_copyable(pipeline)
self.routes[name] = Route(condition=condition, pipeline=pipeline)
self._route_types[name] = type(pipeline)
return self
def route(self, input_data: Dict[str, Any]) -> Tuple[Any, str]:
"""
Evaluate the input against the conditions and return the appropriate pipeline.
Args:
input_data: The input dictionary to be evaluated
Returns:
A tuple containing the next Pipeline to be executed and the name of the route taken
"""
for name, route in self.routes.items():
if route.condition(input_data):
return route.pipeline, name
return self.default, "default"
def copy(self) -> "Router":
"""Create a deep copy of the Router."""
new_routes = {
name: Route(
condition=deepcopy(route.condition),
pipeline=route.pipeline.copy(),
)
for name, route in self.routes.items()
}
new_default = self.default.copy()
return Router(routes=new_routes, default=new_default)

View File

@@ -22,7 +22,8 @@
"sumamrize_instruction": "Summarize the following text, make sure to include all the important information: {group}",
"summary": "This is a summary of our conversation so far:\n{merged_summary}",
"manager_request": "Your best answer to your coworker asking you this, accounting for the context shared.",
"formatted_task_instructions": "Ensure your final answer contains only the content in the following format: {output_format}\n\nEnsure the final output does not include any code block markers like ```json or ```python."
"formatted_task_instructions": "Ensure your final answer contains only the content in the following format: {output_format}\n\nEnsure the final output does not include any code block markers like ```json or ```python.",
"human_feedback_classification": "Determine if the following feedback indicates that the user is satisfied or if further changes are needed. Respond with 'True' if further changes are needed, or 'False' if the user is satisfied. **Important** Do not include any additional commentary outside of your 'True' or 'False' response.\n\nFeedback: \"{feedback}\""
},
"errors": {
"force_final_answer_error": "You can't keep going, this was the best you could do.\n {formatted_answer.text}",

View File

@@ -1,3 +1,5 @@
TRAINING_DATA_FILE = "training_data.pkl"
TRAINED_AGENTS_DATA_FILE = "trained_agents_data.pkl"
DEFAULT_SCORE_THRESHOLD = 0.35
KNOWLEDGE_DIRECTORY = "knowledge"
MAX_LLM_RETRY = 3

View File

@@ -3,19 +3,20 @@
import os
from unittest import mock
from unittest.mock import patch
import pytest
from crewai import Agent, Crew, Task
from crewai.agents.cache import CacheHandler
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.agents.parser import AgentAction, CrewAgentParser, OutputParserException
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.llm import LLM
from crewai.tools import tool
from crewai.tools.tool_calling import InstructorToolCalling
from crewai.tools.tool_usage import ToolUsage
from crewai.tools.tool_usage_events import ToolUsageFinished
from crewai.utilities import RPMController
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.utilities.events import Emitter
@@ -985,8 +986,7 @@ def test_agent_definition_based_on_dict():
# test for human input
@pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_human_input():
from unittest.mock import patch
# Agent configuration
config = {
"role": "test role",
"goal": "test goal",
@@ -995,6 +995,7 @@ def test_agent_human_input():
agent = Agent(**config)
# Task configuration with human input enabled
task = Task(
agent=agent,
description="Say the word: Hi",
@@ -1002,11 +1003,26 @@ def test_agent_human_input():
human_input=True,
)
with patch.object(CrewAgentExecutor, "_ask_human_input") as mock_human_input:
mock_human_input.return_value = "Don't say hi, say Hello instead!"
# Side effect function for _ask_human_input to simulate multiple feedback iterations
feedback_responses = iter(
[
"Don't say hi, say Hello instead!", # First feedback
"looks good", # Second feedback to exit loop
]
)
def ask_human_input_side_effect(*args, **kwargs):
return next(feedback_responses)
with patch.object(
CrewAgentExecutor, "_ask_human_input", side_effect=ask_human_input_side_effect
) as mock_human_input:
# Execute the task
output = agent.execute_task(task)
mock_human_input.assert_called_once()
assert output == "Hello"
# Assertions to ensure the agent behaves correctly
assert mock_human_input.call_count == 2 # Should have asked for feedback twice
assert output.strip().lower() == "hello" # Final output should be 'Hello'
def test_interpolate_inputs():

View File

@@ -9,7 +9,8 @@ interactions:
is the expect criteria for your final answer: The word: Hi\nyou MUST return
the actual complete content as the final answer, not a summary.\n\nBegin! This
is VERY important to you, use the tools available and give your best Final Answer,
your job depends on it!\n\nThought:"}], "model": "gpt-4o"}'
your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": ["\nObservation:"],
"stream": false}'
headers:
accept:
- application/json
@@ -18,16 +19,13 @@ interactions:
connection:
- keep-alive
content-length:
- '774'
- '824'
content-type:
- application/json
cookie:
- __cf_bm=rb61BZH2ejzD5YPmLaEJqI7km71QqyNJGTVdNxBq6qk-1727213194-1.0.1.1-pJ49onmgX9IugEMuYQMralzD7oj_6W.CHbSu4Su1z3NyjTGYg.rhgJZWng8feFYah._oSnoYlkTjpK1Wd2C9FA;
_cfuvid=lbRdAddVWV6W3f5Dm9SaOPWDUOxqtZBSPr_fTW26nEA-1727213194587-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.47.0
- OpenAI/Python 1.52.1
x-stainless-arch:
- arm64
x-stainless-async:
@@ -37,29 +35,34 @@ interactions:
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.47.0
- 1.52.1
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.7
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-AB7WMYMmqACvaemh26N6a62wxlxvx\",\n \"object\":
\"chat.completion\",\n \"created\": 1727213882,\n \"model\": \"gpt-4o-2024-05-13\",\n
content: "{\n \"id\": \"chatcmpl-AaqIIsTxhvf75xvuu7gQScIlRSKbW\",\n \"object\":
\"chat.completion\",\n \"created\": 1733344190,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Thought: I now can give a great answer\\nFinal
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: Hi\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
158,\n \"completion_tokens\": 14,\n \"total_tokens\": 172,\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0\n }\n },\n \"system_fingerprint\": \"fp_e375328146\"\n}\n"
158,\n \"completion_tokens\": 12,\n \"total_tokens\": 170,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"system_fingerprint\":
\"fp_0705bf87c0\"\n}\n"
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 8c85eb4f58751cf3-GRU
- 8ece8cfc3b1f4532-ATL
Connection:
- keep-alive
Content-Encoding:
@@ -67,7 +70,118 @@ interactions:
Content-Type:
- application/json
Date:
- Tue, 24 Sep 2024 21:38:03 GMT
- Wed, 04 Dec 2024 20:29:50 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=QJZZjZ6eqnVamqUkw.Bx0mj7oBi3a_vGEH1VODcUxlg-1733344190-1.0.1.1-xyN0ekA9xIrSwEhRBmTiWJ3Pt72UYLU5owKfkz5yihVmMTfsr_Qz.ssGPJ5cuft066v1xVjb4zOSTdFmesMSKg;
path=/; expires=Wed, 04-Dec-24 20:59:50 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=eCIkP8GVPvpkg19eOhCquWFHm.RTQBQy4yHLGGEAH5c-1733344190334-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '313'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999816'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_9fd9a8ee688045dcf7ac5f6fdf689372
http_version: HTTP/1.1
status_code: 200
- request:
body: '{"messages": [{"role": "system", "content": "Determine if the following
feedback indicates that the user is satisfied or if further changes are needed.
Respond with ''True'' if further changes are needed, or ''False'' if the user
is satisfied. **Important** Do not include any additional commentary outside
of your ''True'' or ''False'' response.\n\nFeedback: \"Don''t say hi, say Hello
instead!\""}], "model": "gpt-4o-mini", "stop": ["\nObservation:"], "stream":
false}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '461'
content-type:
- application/json
cookie:
- __cf_bm=QJZZjZ6eqnVamqUkw.Bx0mj7oBi3a_vGEH1VODcUxlg-1733344190-1.0.1.1-xyN0ekA9xIrSwEhRBmTiWJ3Pt72UYLU5owKfkz5yihVmMTfsr_Qz.ssGPJ5cuft066v1xVjb4zOSTdFmesMSKg;
_cfuvid=eCIkP8GVPvpkg19eOhCquWFHm.RTQBQy4yHLGGEAH5c-1733344190334-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.52.1
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.52.1
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-AaqIIaQlLyoyPmk909PvAIfA2TmJL\",\n \"object\":
\"chat.completion\",\n \"created\": 1733344190,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"True\",\n \"refusal\": null\n
\ },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n
\ ],\n \"usage\": {\n \"prompt_tokens\": 78,\n \"completion_tokens\":
1,\n \"total_tokens\": 79,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n
\ \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"system_fingerprint\":
\"fp_0705bf87c0\"\n}\n"
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 8ece8d060b5e4532-ATL
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Wed, 04 Dec 2024 20:29:50 GMT
Server:
- cloudflare
Transfer-Encoding:
@@ -76,28 +190,30 @@ interactions:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '262'
- '375'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '10000'
- '30000'
x-ratelimit-limit-tokens:
- '30000000'
- '150000000'
x-ratelimit-remaining-requests:
- '9999'
- '29999'
x-ratelimit-remaining-tokens:
- '29999817'
- '149999898'
x-ratelimit-reset-requests:
- 6ms
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_69b1deae1cc3cbf488cee975cd3b04df
- req_be7cb475e0859a82c37ee3f2871ea5ea
http_version: HTTP/1.1
status_code: 200
- request:
@@ -110,8 +226,10 @@ interactions:
is the expect criteria for your final answer: The word: Hi\nyou MUST return
the actual complete content as the final answer, not a summary.\n\nBegin! This
is VERY important to you, use the tools available and give your best Final Answer,
your job depends on it!\n\nThought:"}, {"role": "user", "content": "Feedback:
Don''t say hi, say Hello instead!"}], "model": "gpt-4o"}'
your job depends on it!\n\nThought:"}, {"role": "assistant", "content": "I now
can give a great answer \nFinal Answer: Hi"}, {"role": "user", "content": "Feedback:
Don''t say hi, say Hello instead!"}], "model": "gpt-4o-mini", "stop": ["\nObservation:"],
"stream": false}'
headers:
accept:
- application/json
@@ -120,16 +238,16 @@ interactions:
connection:
- keep-alive
content-length:
- '849'
- '986'
content-type:
- application/json
cookie:
- __cf_bm=rb61BZH2ejzD5YPmLaEJqI7km71QqyNJGTVdNxBq6qk-1727213194-1.0.1.1-pJ49onmgX9IugEMuYQMralzD7oj_6W.CHbSu4Su1z3NyjTGYg.rhgJZWng8feFYah._oSnoYlkTjpK1Wd2C9FA;
_cfuvid=lbRdAddVWV6W3f5Dm9SaOPWDUOxqtZBSPr_fTW26nEA-1727213194587-0.0.1.1-604800000
- __cf_bm=QJZZjZ6eqnVamqUkw.Bx0mj7oBi3a_vGEH1VODcUxlg-1733344190-1.0.1.1-xyN0ekA9xIrSwEhRBmTiWJ3Pt72UYLU5owKfkz5yihVmMTfsr_Qz.ssGPJ5cuft066v1xVjb4zOSTdFmesMSKg;
_cfuvid=eCIkP8GVPvpkg19eOhCquWFHm.RTQBQy4yHLGGEAH5c-1733344190334-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.47.0
- OpenAI/Python 1.52.1
x-stainless-arch:
- arm64
x-stainless-async:
@@ -139,29 +257,34 @@ interactions:
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.47.0
- 1.52.1
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.7
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-AB7WNec1Ohw0pEU91kuCTuts2hXWM\",\n \"object\":
\"chat.completion\",\n \"created\": 1727213883,\n \"model\": \"gpt-4o-2024-05-13\",\n
content: "{\n \"id\": \"chatcmpl-AaqIJAAxpVfUOdrsgYKHwfRlHv4RS\",\n \"object\":
\"chat.completion\",\n \"created\": 1733344191,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Thought: I now can give a great answer\\nFinal
Answer: Hello\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
172,\n \"completion_tokens\": 14,\n \"total_tokens\": 186,\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0\n }\n },\n \"system_fingerprint\": \"fp_e375328146\"\n}\n"
\"assistant\",\n \"content\": \"Thought: I now can give a great answer
\ \\nFinal Answer: Hello\",\n \"refusal\": null\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
188,\n \"completion_tokens\": 14,\n \"total_tokens\": 202,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"system_fingerprint\":
\"fp_0705bf87c0\"\n}\n"
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 8c85eb52cd7c1cf3-GRU
- 8ece8d090fc34532-ATL
Connection:
- keep-alive
Content-Encoding:
@@ -169,7 +292,7 @@ interactions:
Content-Type:
- application/json
Date:
- Tue, 24 Sep 2024 21:38:03 GMT
- Wed, 04 Dec 2024 20:29:51 GMT
Server:
- cloudflare
Transfer-Encoding:
@@ -178,28 +301,134 @@ interactions:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '261'
- '484'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '10000'
- '30000'
x-ratelimit-limit-tokens:
- '30000000'
- '150000000'
x-ratelimit-remaining-requests:
- '9999'
- '29999'
x-ratelimit-remaining-tokens:
- '29999806'
- '149999793'
x-ratelimit-reset-requests:
- 6ms
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_11a316792b5f54af94cce0c702aec290
- req_5bf4a565ad6c2567a1ed204ecac89134
http_version: HTTP/1.1
status_code: 200
- request:
body: '{"messages": [{"role": "system", "content": "Determine if the following
feedback indicates that the user is satisfied or if further changes are needed.
Respond with ''True'' if further changes are needed, or ''False'' if the user
is satisfied. **Important** Do not include any additional commentary outside
of your ''True'' or ''False'' response.\n\nFeedback: \"looks good\""}], "model":
"gpt-4o-mini", "stop": ["\nObservation:"], "stream": false}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '439'
content-type:
- application/json
cookie:
- __cf_bm=QJZZjZ6eqnVamqUkw.Bx0mj7oBi3a_vGEH1VODcUxlg-1733344190-1.0.1.1-xyN0ekA9xIrSwEhRBmTiWJ3Pt72UYLU5owKfkz5yihVmMTfsr_Qz.ssGPJ5cuft066v1xVjb4zOSTdFmesMSKg;
_cfuvid=eCIkP8GVPvpkg19eOhCquWFHm.RTQBQy4yHLGGEAH5c-1733344190334-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.52.1
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.52.1
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-AaqIJqyG8vl9mxj2qDPZgaxyNLLIq\",\n \"object\":
\"chat.completion\",\n \"created\": 1733344191,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"False\",\n \"refusal\": null\n
\ },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n
\ ],\n \"usage\": {\n \"prompt_tokens\": 73,\n \"completion_tokens\":
1,\n \"total_tokens\": 74,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n
\ \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"system_fingerprint\":
\"fp_0705bf87c0\"\n}\n"
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 8ece8d0cfdeb4532-ATL
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Wed, 04 Dec 2024 20:29:51 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '341'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999902'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_5554bade8ceda00cf364b76a51b708ff
http_version: HTTP/1.1
status_code: 200
version: 1

View File

@@ -1,921 +0,0 @@
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline import Pipeline
from crewai.pipeline.pipeline_kickoff_result import PipelineKickoffResult
from crewai.process import Process
from crewai.routers.router import Route, Router
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.types.usage_metrics import UsageMetrics
from pydantic import BaseModel, ValidationError
DEFAULT_TOKEN_USAGE = UsageMetrics(
total_tokens=100, prompt_tokens=50, completion_tokens=50, successful_requests=3
)
@pytest.fixture
def mock_crew_factory():
def _create_mock_crew(name: str, output_json_dict=None, pydantic_output=None):
MockCrewClass = type("MockCrew", (MagicMock, Crew), {})
class MockCrew(MockCrewClass):
def __deepcopy__(self):
result = MockCrewClass()
result.kickoff_async = self.kickoff_async
result.name = self.name
return result
def copy(
self,
):
return self
crew = MockCrew()
crew.name = name
crew.knowledge = None
task_output = TaskOutput(
description="Test task", raw="Task output", agent="Test Agent"
)
crew_output = CrewOutput(
raw="Test output",
tasks_output=[task_output],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict=output_json_dict if output_json_dict else None,
pydantic=pydantic_output,
)
async def kickoff_async(inputs=None):
return crew_output
# Create an AsyncMock for kickoff_async
crew.kickoff_async = AsyncMock(side_effect=kickoff_async)
# Mock the synchronous kickoff method
crew.kickoff = MagicMock(return_value=crew_output)
# Add more attributes that Procedure might be expecting
crew.verbose = False
crew.output_log_file = None
crew.max_rpm = None
crew.memory = False
crew.process = Process.sequential
crew.config = None
crew.cache = True
crew.embedder = None
# Add non-empty agents and tasks
mock_agent = MagicMock(spec=Agent)
mock_task = MagicMock(spec=Task)
mock_task.agent = mock_agent
mock_task.async_execution = False
mock_task.context = None
crew.agents = [mock_agent]
crew.tasks = [mock_task]
return crew
return _create_mock_crew
@pytest.fixture
def mock_router_factory(mock_crew_factory):
def _create_mock_router():
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"output": "crew1"})
crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"output": "crew2"})
crew3 = mock_crew_factory(name="Crew 3", output_json_dict={"output": "crew3"})
MockRouterClass = type("MockRouter", (MagicMock, Router), {})
class MockRouter(MockRouterClass):
def __deepcopy__(self, memo):
result = MockRouterClass()
result.route = self.route
return result
mock_router = MockRouter()
mock_router.route = MagicMock(
side_effect=lambda x: (
(
Pipeline(stages=[crew1])
if x.get("score", 0) > 80
else (
Pipeline(stages=[crew2])
if x.get("score", 0) > 50
else Pipeline(stages=[crew3])
)
),
(
"route1"
if x.get("score", 0) > 80
else "route2"
if x.get("score", 0) > 50
else "default"
),
)
)
return mock_router
return _create_mock_router
def test_pipeline_initialization(mock_crew_factory):
"""
Test that a Pipeline is correctly initialized with the given stages.
"""
crew1 = mock_crew_factory(name="Crew 1")
crew2 = mock_crew_factory(name="Crew 2")
pipeline = Pipeline(stages=[crew1, crew2])
assert len(pipeline.stages) == 2
assert pipeline.stages[0] == crew1
assert pipeline.stages[1] == crew2
@pytest.mark.asyncio
async def test_pipeline_with_empty_input(mock_crew_factory):
"""
Ensure the pipeline handles an empty input list correctly.
"""
crew = mock_crew_factory(name="Test Crew")
pipeline = Pipeline(stages=[crew])
input_data = []
pipeline_results = await pipeline.kickoff(input_data)
assert (
len(pipeline_results) == 0
), "Pipeline should return empty results for empty input"
agent = Agent(
role="Test Role",
goal="Test Goal",
backstory="Test Backstory",
allow_delegation=False,
verbose=False,
)
task = Task(
description="Return: Test output",
expected_output="Test output",
agent=agent,
async_execution=False,
context=None,
)
@pytest.mark.asyncio
async def test_pipeline_process_streams_single_input():
"""
Test that Pipeline.process_streams() correctly processes a single input
and returns the expected CrewOutput.
"""
crew_name = "Test Crew"
mock_crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
mock_crew.name = crew_name
pipeline = Pipeline(stages=[mock_crew])
input_data = [{"key": "value"}]
with patch.object(Crew, "kickoff_async") as mock_kickoff:
task_output = TaskOutput(
description="Test task", raw="Task output", agent="Test Agent"
)
mock_kickoff.return_value = CrewOutput(
raw="Test output",
tasks_output=[task_output],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict=None,
pydantic=None,
)
pipeline_results = await pipeline.kickoff(input_data)
mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"})
for pipeline_result in pipeline_results:
assert isinstance(pipeline_result, PipelineKickoffResult)
assert pipeline_result.raw == "Test output"
assert len(pipeline_result.crews_outputs) == 1
assert pipeline_result.token_usage == {crew_name: DEFAULT_TOKEN_USAGE}
assert pipeline_result.trace == [input_data[0], "Test Crew"]
@pytest.mark.asyncio
async def test_pipeline_result_ordering():
"""
Ensure that results are returned in the same order as the inputs, especially with parallel processing.
"""
crew1 = Crew(
name="Crew 1",
agents=[agent],
tasks=[task],
)
crew2 = Crew(
name="Crew 2",
agents=[agent],
tasks=[task],
)
crew3 = Crew(
name="Crew 3",
agents=[agent],
tasks=[task],
)
pipeline = Pipeline(
stages=[crew1, [crew2, crew3]]
) # Parallel stage to test ordering
input_data = [{"id": 1}, {"id": 2}, {"id": 3}]
def create_crew_output(crew_name):
return CrewOutput(
raw=f"Test output from {crew_name}",
tasks_output=[
TaskOutput(
description="Test task",
raw=f"Task output from {crew_name}",
agent="Test Agent",
)
],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"output": crew_name.lower().replace(" ", "")},
pydantic=None,
)
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_kickoff.side_effect = [
create_crew_output("Crew 1"),
create_crew_output("Crew 2"),
create_crew_output("Crew 3"),
] * 3
pipeline_results = await pipeline.kickoff(input_data)
mock_kickoff.call_count = 3
assert (
len(pipeline_results) == 6
), "Should have 2 results for each input due to the parallel final stage"
# Group results by their original input id
grouped_results = {}
for result in pipeline_results:
input_id = result.trace[0]["id"]
if input_id not in grouped_results:
grouped_results[input_id] = []
grouped_results[input_id].append(result)
# Check that we have the correct number of groups and results per group
assert len(grouped_results) == 3, "Should have results for each of the 3 inputs"
for input_id, results in grouped_results.items():
assert (
len(results) == 2
), f"Each input should have 2 results, but input {input_id} has {len(results)}"
# Check the ordering and content of the results
for input_id in range(1, 4):
group = grouped_results[input_id]
assert group[0].trace == [
{"id": input_id},
"Crew 1",
"Crew 2",
], f"Unexpected trace for first result of input {input_id}"
assert group[1].trace == [
{"id": input_id},
"Crew 1",
"Crew 3",
], f"Unexpected trace for second result of input {input_id}"
class TestPydanticOutput(BaseModel):
key: str
value: int
@pytest.mark.asyncio
@pytest.mark.vcr(filter_headers=["authorization"])
async def test_pipeline_process_streams_single_input_pydantic_output():
crew_name = "Test Crew"
task = Task(
description="Return: Key:value",
expected_output="Key:Value",
agent=agent,
async_execution=False,
context=None,
output_pydantic=TestPydanticOutput,
)
mock_crew = Crew(
name=crew_name,
agents=[agent],
tasks=[task],
)
pipeline = Pipeline(stages=[mock_crew])
input_data = [{"key": "value"}]
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_crew_output = CrewOutput(
raw="Test output",
tasks_output=[
TaskOutput(
description="Return: Key:value", raw="Key:Value", agent="Test Agent"
)
],
token_usage=UsageMetrics(
total_tokens=171,
prompt_tokens=154,
completion_tokens=17,
successful_requests=1,
),
pydantic=TestPydanticOutput(key="test", value=42),
)
mock_kickoff.return_value = mock_crew_output
pipeline_results = await pipeline.kickoff(input_data)
assert len(pipeline_results) == 1
pipeline_result = pipeline_results[0]
assert isinstance(pipeline_result, PipelineKickoffResult)
assert pipeline_result.raw == "Test output"
assert len(pipeline_result.crews_outputs) == 1
assert pipeline_result.token_usage == {
crew_name: UsageMetrics(
total_tokens=171,
prompt_tokens=154,
completion_tokens=17,
successful_requests=1,
)
}
assert pipeline_result.trace == [input_data[0], "Test Crew"]
assert isinstance(pipeline_result.pydantic, TestPydanticOutput)
assert pipeline_result.pydantic.key == "test"
assert pipeline_result.pydantic.value == 42
assert pipeline_result.json_dict is None
@pytest.mark.asyncio
async def test_pipeline_preserves_original_input(mock_crew_factory):
crew_name = "Test Crew"
mock_crew = mock_crew_factory(
name=crew_name,
output_json_dict={"new_key": "new_value"},
)
pipeline = Pipeline(stages=[mock_crew])
# Create a deep copy of the input data to ensure we're not comparing references
original_input_data = [{"key": "value", "nested": {"a": 1}}]
input_data = json.loads(json.dumps(original_input_data))
await pipeline.kickoff(input_data)
# Assert that the original input hasn't been modified
assert (
input_data == original_input_data
), "The original input data should not be modified"
# Ensure that even nested structures haven't been modified
assert (
input_data[0]["nested"] == original_input_data[0]["nested"]
), "Nested structures should not be modified"
# Verify that adding new keys to the crew output doesn't affect the original input
assert (
"new_key" not in input_data[0]
), "New keys from crew output should not be added to the original input"
@pytest.mark.asyncio
async def test_pipeline_process_streams_multiple_inputs():
"""
Test that Pipeline.process_streams() correctly processes multiple inputs
and returns the expected CrewOutputs.
"""
mock_crew = Crew(name="Test Crew", tasks=[task], agents=[agent])
pipeline = Pipeline(stages=[mock_crew])
input_data = [{"key1": "value1"}, {"key2": "value2"}]
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_kickoff.return_value = CrewOutput(
raw="Test output",
tasks_output=[
TaskOutput(
description="Test task", raw="Task output", agent="Test Agent"
)
],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict=None,
pydantic=None,
)
pipeline_results = await pipeline.kickoff(input_data)
assert mock_kickoff.call_count == 2
assert len(pipeline_results) == 2
for pipeline_result in pipeline_results:
assert all(
isinstance(crew_output, CrewOutput)
for crew_output in pipeline_result.crews_outputs
)
@pytest.mark.asyncio
async def test_pipeline_with_parallel_stages():
"""
Test that Pipeline correctly handles parallel stages.
"""
crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent])
crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent])
crew3 = Crew(name="Crew 3", tasks=[task], agents=[agent])
pipeline = Pipeline(stages=[crew1, [crew2, crew3]])
input_data = [{"initial": "data"}]
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_kickoff.return_value = CrewOutput(
raw="Test output",
tasks_output=[
TaskOutput(
description="Test task", raw="Task output", agent="Test Agent"
)
],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict=None,
pydantic=None,
)
pipeline_result = await pipeline.kickoff(input_data)
mock_kickoff.assert_called_with(inputs={"initial": "data"})
assert len(pipeline_result) == 2
pipeline_result_1, pipeline_result_2 = pipeline_result
pipeline_result_1.trace = [
"Crew 1",
"Crew 2",
]
pipeline_result_2.trace = [
"Crew 1",
"Crew 3",
]
expected_token_usage = {
"Crew 1": DEFAULT_TOKEN_USAGE,
"Crew 2": DEFAULT_TOKEN_USAGE,
"Crew 3": DEFAULT_TOKEN_USAGE,
}
assert pipeline_result_1.token_usage == expected_token_usage
assert pipeline_result_2.token_usage == expected_token_usage
@pytest.mark.asyncio
async def test_pipeline_with_parallel_stages_end_in_single_stage(mock_crew_factory):
"""
Test that Pipeline correctly handles parallel stages.
"""
crew1 = mock_crew_factory(name="Crew 1")
crew2 = mock_crew_factory(name="Crew 2")
crew3 = mock_crew_factory(name="Crew 3")
crew4 = mock_crew_factory(name="Crew 4")
pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4])
input_data = [{"initial": "data"}]
pipeline_result = await pipeline.kickoff(input_data)
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
assert len(pipeline_result) == 1
pipeline_result_1 = pipeline_result[0]
pipeline_result_1.trace = [
input_data[0],
"Crew 1",
["Crew 2", "Crew 3"],
"Crew 4",
]
expected_token_usage = {
"Crew 1": DEFAULT_TOKEN_USAGE,
"Crew 2": DEFAULT_TOKEN_USAGE,
"Crew 3": DEFAULT_TOKEN_USAGE,
"Crew 4": DEFAULT_TOKEN_USAGE,
}
assert pipeline_result_1.token_usage == expected_token_usage
def test_pipeline_rshift_operator(mock_crew_factory):
"""
Test that the >> operator correctly creates a Pipeline from Crews and lists of Crews.
"""
crew1 = mock_crew_factory(name="Crew 1")
crew2 = mock_crew_factory(name="Crew 2")
crew3 = mock_crew_factory(name="Crew 3")
# Test single crew addition
pipeline = Pipeline(stages=[]) >> crew1
assert len(pipeline.stages) == 1
assert pipeline.stages[0] == crew1
# Test adding a list of crews
pipeline = Pipeline(stages=[crew1])
pipeline = pipeline >> [crew2, crew3]
assert len(pipeline.stages) == 2
assert pipeline.stages[1] == [crew2, crew3]
# Test error case: trying to shift with non-Crew object
with pytest.raises(TypeError):
pipeline >> "not a crew"
@pytest.mark.asyncio
@pytest.mark.vcr(filter_headers=["authorization"])
async def test_pipeline_parallel_crews_to_parallel_crews():
"""
Test that feeding parallel crews to parallel crews works correctly.
"""
crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent])
crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent])
crew3 = Crew(name="Crew 3", tasks=[task], agents=[agent])
crew4 = Crew(name="Crew 4", tasks=[task], agents=[agent])
# output_json_dict={"output1": "crew1"}
pipeline = Pipeline(stages=[[crew1, crew2], [crew3, crew4]])
input_data = [{"input": "test"}]
def create_crew_output(crew_name):
return CrewOutput(
raw=f"Test output from {crew_name}",
tasks_output=[
TaskOutput(
description="Test task",
raw=f"Task output from {crew_name}",
agent="Test Agent",
)
],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"output": crew_name.lower().replace(" ", "")},
pydantic=None,
)
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_kickoff.side_effect = [
create_crew_output(crew_name)
for crew_name in ["Crew 1", "Crew 2", "Crew 3", "Crew 4"]
]
pipeline_results = await pipeline.kickoff(input_data)
assert len(pipeline_results) == 2, "Should have 2 results for final parallel stage"
pipeline_result_1, pipeline_result_2 = pipeline_results
# Check the outputs
assert pipeline_result_1.json_dict == {"output": "crew3"}
assert pipeline_result_2.json_dict == {"output": "crew4"}
# Check the traces
expected_traces = [
[{"input": "test"}, ["Crew 1", "Crew 2"], "Crew 3"],
[{"input": "test"}, ["Crew 1", "Crew 2"], "Crew 4"],
]
for result, expected_trace in zip(pipeline_results, expected_traces):
assert result.trace == expected_trace, f"Unexpected trace: {result.trace}"
def test_pipeline_double_nesting_not_allowed(mock_crew_factory):
"""
Test that double nesting in pipeline stages is not allowed.
"""
crew1 = mock_crew_factory(name="Crew 1")
crew2 = mock_crew_factory(name="Crew 2")
crew3 = mock_crew_factory(name="Crew 3")
crew4 = mock_crew_factory(name="Crew 4")
with pytest.raises(ValidationError) as exc_info:
Pipeline(stages=[crew1, [[crew2, crew3], crew4]])
error_msg = str(exc_info.value)
assert (
"Double nesting is not allowed in pipeline stages" in error_msg
), f"Unexpected error message: {error_msg}"
def test_pipeline_invalid_crew(mock_crew_factory):
"""
Test that non-Crew objects are not allowed in pipeline stages.
"""
crew1 = mock_crew_factory(name="Crew 1")
not_a_crew = "This is not a crew"
with pytest.raises(ValidationError) as exc_info:
Pipeline(stages=[crew1, not_a_crew])
error_msg = str(exc_info.value)
assert (
"Expected Crew instance, Router instance, or list of Crews, got <class 'str'>"
in error_msg
), f"Unexpected error message: {error_msg}"
"""
TODO: Figure out what is the proper output for a pipeline with multiple stages
Options:
- Should the final output only include the last stage's output?
- Should the final output include the accumulation of previous stages' outputs?
"""
@pytest.mark.asyncio
async def test_pipeline_data_accumulation(mock_crew_factory):
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"})
crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"key2": "value2"})
pipeline = Pipeline(stages=[crew1, crew2])
input_data = [{"initial": "data"}]
results = await pipeline.kickoff(input_data)
# Check that crew1 was called with only the initial input
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
# Check that crew2 was called with the combined input from the initial data and crew1's output
crew2.kickoff_async.assert_called_once_with(
inputs={"initial": "data", "key1": "value1"}
)
# Check the final output
assert len(results) == 1
final_result = results[0]
assert final_result.json_dict == {"key2": "value2"}
# Check that the trace includes all stages
assert final_result.trace == [{"initial": "data"}, "Crew 1", "Crew 2"]
# Check that crews_outputs contain the correct information
assert len(final_result.crews_outputs) == 2
assert final_result.crews_outputs[0].json_dict == {"key1": "value1"}
assert final_result.crews_outputs[1].json_dict == {"key2": "value2"}
@pytest.mark.asyncio
@pytest.mark.vcr(filter_headers=["authorization"])
async def test_pipeline_with_router():
crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent])
crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent])
crew3 = Crew(name="Crew 3", tasks=[task], agents=[agent])
routes = {
"route1": Route(
condition=lambda x: x.get("score", 0) > 80,
pipeline=Pipeline(stages=[crew1]),
),
"route2": Route(
condition=lambda x: 50 < x.get("score", 0) <= 80,
pipeline=Pipeline(stages=[crew2]),
),
}
router = Router(
routes=routes,
default=Pipeline(stages=[crew3]),
)
# Test high score route
pipeline = Pipeline(stages=[router])
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_kickoff.return_value = CrewOutput(
raw="Test output from Crew 1",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"output": "crew1"},
pydantic=None,
)
result_high = await pipeline.kickoff([{"score": 90}])
assert len(result_high) == 1
assert result_high[0].json_dict is not None
assert result_high[0].json_dict["output"] == "crew1"
assert result_high[0].trace == [
{"score": 90},
{"route_taken": "route1"},
"Crew 1",
]
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_kickoff.return_value = CrewOutput(
raw="Test output from Crew 2",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"output": "crew2"},
pydantic=None,
)
# Test medium score route
pipeline = Pipeline(stages=[router])
result_medium = await pipeline.kickoff([{"score": 60}])
assert len(result_medium) == 1
assert result_medium[0].json_dict is not None
assert result_medium[0].json_dict["output"] == "crew2"
assert result_medium[0].trace == [
{"score": 60},
{"route_taken": "route2"},
"Crew 2",
]
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_kickoff.return_value = CrewOutput(
raw="Test output from Crew 3",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"output": "crew3"},
pydantic=None,
)
# Test low score route
pipeline = Pipeline(stages=[router])
result_low = await pipeline.kickoff([{"score": 30}])
assert len(result_low) == 1
assert result_low[0].json_dict is not None
assert result_low[0].json_dict["output"] == "crew3"
assert result_low[0].trace == [
{"score": 30},
{"route_taken": "default"},
"Crew 3",
]
@pytest.mark.asyncio
@pytest.mark.vcr(filter_headers=["authorization"])
async def test_router_with_multiple_inputs():
crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent])
crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent])
crew3 = Crew(name="Crew 3", tasks=[task], agents=[agent])
router = Router(
routes={
"route1": Route(
condition=lambda x: x.get("score", 0) > 80,
pipeline=Pipeline(stages=[crew1]),
),
"route2": Route(
condition=lambda x: 50 < x.get("score", 0) <= 80,
pipeline=Pipeline(stages=[crew2]),
),
},
default=Pipeline(stages=[crew3]),
)
pipeline = Pipeline(stages=[router])
inputs = [{"score": 90}, {"score": 60}, {"score": 30}]
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_kickoff.side_effect = [
CrewOutput(
raw="Test output from Crew 1",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"output": "crew1"},
pydantic=None,
),
CrewOutput(
raw="Test output from Crew 2",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"output": "crew2"},
pydantic=None,
),
CrewOutput(
raw="Test output from Crew 3",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"output": "crew3"},
pydantic=None,
),
]
results = await pipeline.kickoff(inputs)
assert len(results) == 3
assert results[0].json_dict is not None
assert results[0].json_dict["output"] == "crew1"
assert results[1].json_dict is not None
assert results[1].json_dict["output"] == "crew2"
assert results[2].json_dict is not None
assert results[2].json_dict["output"] == "crew3"
assert results[0].trace[1]["route_taken"] == "route1"
assert results[1].trace[1]["route_taken"] == "route2"
assert results[2].trace[1]["route_taken"] == "default"
@pytest.mark.asyncio
@pytest.mark.vcr(filter_headers=["authorization"])
async def test_pipeline_with_multiple_routers():
crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent])
crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent])
router1 = Router(
routes={
"route1": Route(
condition=lambda x: x.get("score", 0) > 80,
pipeline=Pipeline(stages=[crew1]),
),
},
default=Pipeline(stages=[crew2]),
)
router2 = Router(
routes={
"route2": Route(
condition=lambda x: 50 < x.get("score", 0) <= 80,
pipeline=Pipeline(stages=[crew2]),
),
},
default=Pipeline(stages=[crew2]),
)
final_crew = Crew(name="Final Crew", tasks=[task], agents=[agent])
pipeline = Pipeline(stages=[router1, router2, final_crew])
with patch.object(Crew, "kickoff_async") as mock_kickoff:
mock_kickoff.side_effect = [
CrewOutput(
raw="Test output from Crew 1",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"output": "crew1"},
pydantic=None,
),
CrewOutput(
raw="Test output from Crew 2",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"output": "crew2"},
pydantic=None,
),
CrewOutput(
raw="Test output from Final Crew",
tasks_output=[],
token_usage=DEFAULT_TOKEN_USAGE,
json_dict={"output": "final"},
pydantic=None,
),
]
result = await pipeline.kickoff([{"score": 75}])
assert len(result) == 1
assert result[0].json_dict is not None
assert result[0].json_dict["output"] == "final"
assert (
len(result[0].trace) == 6
) # Input, Router1, Crew2, Router2, Crew2, Final Crew
assert result[0].trace[1]["route_taken"] == "default"
assert result[0].trace[3]["route_taken"] == "route2"
@pytest.mark.asyncio
async def test_router_default_route(mock_crew_factory):
default_crew = mock_crew_factory(
name="Default Crew", output_json_dict={"output": "default"}
)
router = Router(
routes={
"route1": Route(
condition=lambda x: False,
pipeline=Pipeline(stages=[mock_crew_factory(name="Never Used")]),
),
},
default=Pipeline(stages=[default_crew]),
)
pipeline = Pipeline(stages=[router])
result = await pipeline.kickoff([{"score": 100}])
assert len(result) == 1
assert result[0].json_dict is not None
assert result[0].json_dict["output"] == "default"
assert result[0].trace[1]["route_taken"] == "default"
@pytest.mark.asyncio
@pytest.mark.vcr(filter_headers=["authorization"])
async def test_router_with_empty_input():
crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent])
crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent])
crew3 = Crew(name="Crew 3", tasks=[task], agents=[agent])
router = Router(
routes={
"route1": Route(
condition=lambda x: x.get("score", 0) > 80,
pipeline=Pipeline(stages=[crew1]),
),
"route2": Route(
condition=lambda x: 50 < x.get("score", 0) <= 80,
pipeline=Pipeline(stages=[crew2]),
),
},
default=Pipeline(stages=[crew3]),
)
pipeline = Pipeline(stages=[router])
result = await pipeline.kickoff([{}])
assert len(result) == 1
assert result[0].trace[1]["route_taken"] == "default"

2
uv.lock generated
View File

@@ -608,7 +608,7 @@ wheels = [
[[package]]
name = "crewai"
version = "0.83.0"
version = "0.85.0"
source = { editable = "." }
dependencies = [
{ name = "appdirs" },