Compare commits

..

33 Commits

Author SHA1 Message Date
Eduardo Chiarotti
c400adc887 docs: fix name of tool 2024-07-02 12:53:44 -03:00
Eduardo Chiarotti
d000a73245 docs: add CodeinterpreterTool to docs and update docs 2024-07-02 12:52:29 -03:00
Eduardo Chiarotti
4da587196e docs: remove training docs from README 2024-07-02 12:46:32 -03:00
João Moura
6b9a1d4040 adding link to docs 2024-07-01 18:41:31 -07:00
João Moura
508fbd49e9 preparing new version 2024-07-01 18:28:11 -07:00
João Moura
e18a6c6bb8 updatign tools 2024-07-01 15:25:29 -07:00
João Moura
16237ef393 rollback update to new version 2024-07-01 15:25:10 -07:00
João Moura
5332d02f36 preparing new version 2024-07-01 15:12:22 -07:00
João Moura
7258120a0d preparing new version 2024-07-01 15:10:13 -07:00
João Moura
8b7bc69ba1 preparing new version 2024-07-01 08:41:13 -07:00
João Moura
5a807eb93f preparing new version 2024-07-01 06:08:46 -07:00
João Moura
130682c93b preparing new version 2024-07-01 05:48:47 -07:00
João Moura
02e29e4681 new docs 2024-07-01 05:32:22 -07:00
João Moura
6943eb4463 small formatting details 2024-07-01 05:32:22 -07:00
João Moura
939a18a4d2 Updating docs 2024-07-01 05:32:22 -07:00
João Moura
ccbe415315 updating docs 2024-07-01 05:32:22 -07:00
João Moura
511af98dea small refractoring for new version 2024-07-01 05:32:22 -07:00
gpu7
a9d94112f5 bugfix in python script sample code (#787)
Add the line:

process = Process.sequential
2024-07-01 00:23:06 -03:00
JoePro
1bca6029fe Update LLM-Connections.md (#796)
Revised to utilize Ollama from langchain.llms instead as the functionality from the other method simply doesn't work when delegating.

Co-authored-by: João Moura <joaomdmoura@gmail.com>
2024-07-01 00:22:38 -03:00
Eelke van den Bos
c027aa8bf6 Set manager verbosity to crew verbosity by default (#797)
Fixes #793
2024-07-01 00:20:39 -03:00
finecwg
ce7d86e0df Update tool_usage.py (#828)
fixed error for some cases with Pandas DataFrame:

ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
2024-07-01 00:19:36 -03:00
Bruno Tanabe
5dfaf866c9 fix: Fix grammar error in documentation 'Crew Attributes' (#836)
Correction of grammar error in the CrewAI documentation, on the page 'https://docs.crewai.com/core-concepts/Crews/' it says 'ustom' instead of 'Custom'.
2024-07-01 00:16:06 -03:00
Gui Vieira
5b66e87621 Improve telemetry (#818)
* Improve telemetry

* Minor adjustments

* Try to fix typing error

* Try to fix typing error [2]
2024-06-28 20:05:47 -03:00
João Moura
851dd0f84f preparing new version 2024-06-27 11:04:08 -07:00
Eduardo Chiarotti
2188358f13 docs: add docs for training (#824) 2024-06-27 14:56:32 -03:00
Lorenze Jay
10997dd175 Lorenzejay/byoa (#776)
* better spacing

* works with llama index

* works on langchain custom just need delegation to work

* cleanup for custom_agent class

* works with different argument expectations for agent_executor

* cleanup for hierarchial process, better agent_executor args handler and added to the crew agent doc page

* removed code examples for langchain + llama index, added to docs instead

* added key output if return is not a str for and added some tests

* added hinting for CustomAgent class

* removed pass as it was not needed

* closer just need to figuire ou agentTools

* running agents - llamaindex and langchain with base agent

* some cleanup on baseAgent

* minimum for agent to run for base class and ensure it works with hierarchical process

* cleanup for original agent to take on BaseAgent class

* Agent takes on langchainagent and cleanup across

* token handling working for usage_metrics to continue working

* installed llama-index, updated docs and added better name

* fixed some type errors

* base agent holds token_process

* heirarchail process uses proper tools and no longer relies on hasattr for token_processes

* removal of test_custom_agent_executions

* this fixes copying agents

* leveraging an executor class for trigger llamaindex agent

* llama index now has ask_human

* executor mixins added

* added output converter base class

* type listed

* cleanup for output conversions and tokenprocess eliminated redundancy

* properly handling tokens

* simplified token calc handling

* original agent with base agent builder structure setup

* better docs

* no more llama-index dep

* cleaner docs

* test fixes

* poetry reverts and better docs

* base_agent_tools set for third party agents

* updated task and test fix
2024-06-27 14:56:08 -03:00
Eduardo Chiarotti
da9cc5f097 fix: fix trainig_data error (#820)
* fix: fix trainig_data error

* fix: fix lack crew on agent

* fix: fix lack crew on agent executor
2024-06-27 12:58:20 -03:00
Eduardo Chiarotti
c005ec3f78 fix: fix tests (#814) 2024-06-27 05:45:23 -03:00
Eduardo Chiarotti
6018fe5872 feat: add CodeInterpreterTool to run when enable code execution (#804)
* feat: add CodeInterpreterTool to run when enable code execution is allowed on agent

* feat: change to allow_code_execution

* feat: add readme for CodeInterpreterTool
2024-06-27 02:25:39 -03:00
Nuraly
bf0e70999e Update Agents.md (#816)
Made a space to ensure that Header formatting is displayed correctly on the website
2024-06-27 02:23:18 -03:00
Eduardo Chiarotti
175d5b3dd6 feat: Add Train feature for Crews (#686)
* feat: add training logic to agent and crew

* feat: add training logic to agent executor

* feat: add input parameter  to cli command

* feat: add utilities for the training logic

* feat: polish code, logic and add private variables

* feat: add docstring and type hinting to executor

* feat: add constant file, add constant to code

* feat: fix name of training handler function

* feat: remove unused var

* feat: change file handler file name

* feat: Add training handler file, class and change on the code

* feat: fix name error from file

* fix: change import to adapt to logic

* feat: add training handler test

* feat: add tests for file and training_handler

* feat: add test for task evaluator function

* feat: change text to fit in-screen

* feat: add test for train function

* feat: add test for agent training_handler function

* feat: add test for agent._use_trained_data
2024-06-27 02:22:34 -03:00
Bruno Tanabe
9e61b8325b fix: Fix grammar error in documentation in PDF Search Tool (#819)
Correction of grammar error in the CrewAI documentation, on the page 'https://docs.crewai.com/tools/PDFSearchTool/' it says 'Optinal' instead of 'Optional'.
2024-06-27 00:41:22 -03:00
João Moura
c4d76cde8f updating docs 2024-06-22 19:49:50 -03:00
55 changed files with 411498 additions and 6785 deletions

4
.gitignore vendored
View File

@@ -11,4 +11,6 @@ chroma.sqlite3
old_en.json
db/
test.py
rc-tests/*
rc-tests/*
*.pkl
temp/*

View File

@@ -127,6 +127,7 @@ crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
verbose=2, # You can set it to 1 or 2 to different logging levels
process = Process.sequential
)
# Get your crew to work!
@@ -195,6 +196,7 @@ Please refer to the [Connect crewAI to LLMs](https://docs.crewai.com/how-to/LLM-
**CrewAI's Advantage**: CrewAI is built with production in mind. It offers the flexibility of Autogen's conversational agents and the structured process approach of ChatDev, but without the rigidity. CrewAI's processes are designed to be dynamic and adaptable, fitting seamlessly into both development and production workflows.
## Contribution
CrewAI is open-source and we welcome contributions. If you're looking to contribute, please:

View File

@@ -34,6 +34,7 @@ description: What are crewAI Agents and how to use them.
| **System Template** *(optional)* | Specifies the system format for the agent. Default is `None`. |
| **Prompt Template** *(optional)* | Specifies the prompt format for the agent. Default is `None`. |
| **Response Template** *(optional)* | Specifies the response format for the agent. Default is `None`. |
## Creating an Agent
!!! note "Agent Interaction"
@@ -96,5 +97,53 @@ agent = Agent(
)
```
## Bring your Third Party Agents
!!! note "Extend your Third Party Agents like LlamaIndex, Langchain, Autogen or fully custom agents using the the crewai's BaseAgent class."
BaseAgent includes attributes and methods required to integrate with your crews to run and delegate tasks to other agents within your own crew.
CrewAI is a universal multi agent framework that allows for all agents to work together to automate tasks and solve problems.
```py
from crewai import Agent, Task, Crew
from custom_agent import CustomAgent # You need to build and extend your own agent logic with the CrewAI BaseAgent class then import it here.
from langchain.agents import load_tools
langchain_tools = load_tools(["google-serper"], llm=llm)
agent1 = CustomAgent(
role="backstory agent",
goal="who is {input}?",
backstory="agent backstory",
verbose=True,
)
task1 = Task(
expected_output="a short biography of {input}",
description="a short biography of {input}",
agent=agent1,
)
agent2 = Agent(
role="bio agent",
goal="summarize the short bio for {input} and if needed do more research",
backstory="agent backstory",
verbose=True,
)
task2 = Task(
description="a tldr summary of the short biography",
expected_output="5 bullet point summary of the biography",
agent=agent2,
context=[task1],
)
my_crew = Crew(agents=[agent1, agent2], tasks=[task1, task2])
crew = my_crew.kickoff(inputs={"input": "Mark Twain"})
```
## Conclusion
Agents are the building blocks of the CrewAI framework. By understanding how to define and interact with agents, you can create sophisticated AI systems that leverage the power of collaborative intelligence.
Agents are the building blocks of the CrewAI framework. By understanding how to define and interact with agents, you can create sophisticated AI systems that leverage the power of collaborative intelligence.

View File

@@ -28,7 +28,7 @@ A crew in crewAI represents a collaborative group of agents working together to
| **Task Callback** *(optional)* | A function that is called after the completion of each task. Useful for monitoring or additional operations post-task execution. |
| **Share Crew** *(optional)* | Whether you want to share the complete crew information and execution with the crewAI team to make the library better, and allow us to train models. |
| **Output Log File** *(optional)* | Whether you want to have a file with the complete crew output and execution. You can set it using True and it will default to the folder you are currently in and it will be called logs.txt or passing a string with the full path and name of the file. |
| **Manager Agent** *(optional)* | `manager` sets a ustom agent that will be used as a manager. |
| **Manager Agent** *(optional)* | `manager` sets a custom agent that will be used as a manager. |
| **Manager Callbacks** *(optional)* | `manager_callbacks` takes a list of callback handlers to be executed by the manager agent when a hierarchical process is used. |
| **Prompt File** *(optional)* | Path to the prompt JSON file to be used for the crew. |
@@ -123,7 +123,7 @@ result = my_crew.kickoff()
print(result)
```
### Kicking Off a Crew
### Different wayt to Kicking Off a Crew
Once your crew is assembled, initiate the workflow with the appropriate kickoff method. CrewAI provides several methods for better control over the kickoff process: `kickoff()`, `kickoff_for_each()`, `kickoff_async()`, and `kickoff_for_each_async()`.
@@ -138,18 +138,21 @@ result = my_crew.kickoff()
print(result)
# Example of using kickoff_for_each
results = my_crew.kickoff_for_each()
inputs_array = [{'topic': 'AI in healthcare'}, {'topic': 'AI in finance'}]
results = my_crew.kickoff_for_each(inputs=inputs_array)
for result in results:
print(result)
# Example of using kickoff_async
async_result = my_crew.kickoff_async()
inputs = {'topic': 'AI in healthcare'}
async_result = my_crew.kickoff_async(inputs=inputs)
print(async_result)
# Example of using kickoff_for_each_async
async_results = my_crew.kickoff_for_each_async()
inputs_array = [{'topic': 'AI in healthcare'}, {'topic': 'AI in finance'}]
async_results = my_crew.kickoff_for_each_async(inputs=inputs_array)
for async_result in async_results:
print(async_result)
```
These methods provide flexibility in how you manage and execute tasks within your crew, allowing for both synchronous and asynchronous workflows tailored to your needs
These methods provide flexibility in how you manage and execute tasks within your crew, allowing for both synchronous and asynchronous workflows tailored to your needs

View File

@@ -0,0 +1,33 @@
---
title: crewAI Train
description: Learn how to train your crewAI agents by giving them feedback early on and get consistent results.
---
## Introduction
The training feature in CrewAI allows you to train your AI agents using the command-line interface (CLI). By running the command `crewai train -n <n_iterations>`, you can specify the number of iterations for the training process.
During training, CrewAI utilizes techniques to optimize the performance of your agents along with human feedback. This helps the agents improve their understanding, decision-making, and problem-solving abilities.
To use the training feature, follow these steps:
1. Open your terminal or command prompt.
2. Navigate to the directory where your CrewAI project is located.
3. Run the following command:
```shell
crewai train -n <n_iterations>
```
Replace `<n_iterations>` with the desired number of training iterations. This determines how many times the agents will go through the training process.
### Key Points to Note:
- **Positive Integer Requirement:** Ensure that the number of iterations (`n_iterations`) is a positive integer. The code will raise a `ValueError` if this condition is not met.
- **Error Handling:** The code handles subprocess errors and unexpected exceptions, providing error messages to the user.
It is important to note that the training process may take some time, depending on the complexity of your agents and will also require your feedback on each iteration.
Once the training is complete, your agents will be equipped with enhanced capabilities and knowledge, ready to tackle complex tasks and provide more consistent and valuable insights.
Remember to regularly update and retrain your agents to ensure they stay up-to-date with the latest information and advancements in the field.
Happy training with CrewAI!

View File

@@ -0,0 +1,76 @@
---
title: Coding Agents
description: Learn how to enable your crewAI Agents to write and execute code, and explore advanced features for enhanced functionality.
---
## Introduction
crewAI Agents now have the powerful ability to write and execute code, significantly enhancing their problem-solving capabilities. This feature is particularly useful for tasks that require computational or programmatic solutions.
## Enabling Code Execution
To enable code execution for an agent, set the `allow_code_execution` parameter to `True` when creating the agent. Here's an example:
```python
from crewai import Agent
coding_agent = Agent(
role="Senior Python Developer",
goal="Craft well-designed and thought-out code",
backstory="You are a senior Python developer with extensive experience in software architecture and best practices.",
allow_code_execution=True
)
```
## Important Considerations
1. **Model Selection**: It is strongly recommended to use more capable models like Claude 3.5 Sonnet and GPT-4 when enabling code execution. These models have a better understanding of programming concepts and are more likely to generate correct and efficient code.
2. **Error Handling**: The code execution feature includes error handling. If executed code raises an exception, the agent will receive the error message and can attempt to correct the code or provide alternative solutions.
3. **Dependencies**: To use the code execution feature, you need to install the `crewai_tools` package. If not installed, the agent will log an info message: "Coding tools not available. Install crewai_tools."
## Code Execution Process
When an agent with code execution enabled encounters a task requiring programming:
1. The agent analyzes the task and determines that code execution is necessary.
2. It formulates the Python code needed to solve the problem.
3. The code is sent to the internal code execution tool (`CodeInterpreterTool`).
4. The tool executes the code in a controlled environment and returns the result.
5. The agent interprets the result and incorporates it into its response or uses it for further problem-solving.
## Example Usage
Here's a detailed example of creating an agent with code execution capabilities and using it in a task:
```python
from crewai import Agent, Task, Crew
# Create an agent with code execution enabled
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
# Create a task that requires code execution
data_analysis_task = Task(
description="Analyze the given dataset and calculate the average age of participants.",
agent=coding_agent
)
# Create a crew and add the task
analysis_crew = Crew(
agents=[coding_agent],
tasks=[data_analysis_task]
)
# Execute the crew
result = analysis_crew.kickoff()
print(result)
```
In this example, the `coding_agent` can write and execute Python code to perform data analysis tasks.

View File

@@ -1,11 +1,10 @@
---
title: Assembling and Activating Your CrewAI Team
description: A comprehensive guide to creating a dynamic CrewAI team for your projects, with updated functionalities including verbose mode, memory capabilities, asynchronous execution, output customization, language model configuration, and more.
description: A comprehensive guide to creating a dynamic CrewAI team for your projects, with updated functionalities including verbose mode, memory capabilities, asynchronous execution, output customization, language model configuration, code execution, integration with third-party agents, and improved task management.
---
## Introduction
Embark on your CrewAI journey by setting up your environment and initiating your AI crew with the latest features. This guide ensures a smooth start, incorporating all recent updates for an enhanced experience.
Embark on your CrewAI journey by setting up your environment and initiating your AI crew with the latest features. This guide ensures a smooth start, incorporating all recent updates for an enhanced experience, including code execution capabilities, integration with third-party agents, and advanced task management.
## Step 0: Installation
Install CrewAI and any necessary packages for your project. CrewAI is compatible with Python >=3.10,<=3.13.
@@ -16,46 +15,51 @@ pip install 'crewai[tools]'
```
## Step 1: Assemble Your Agents
Define your agents with distinct roles, backstories, and enhanced capabilities like verbose mode, memory usage, and the ability to set specific agents as managers. These elements add depth and guide their task execution and interaction within the crew.
Define your agents with distinct roles, backstories, and enhanced capabilities. The Agent class now supports a wide range of attributes for fine-tuned control over agent behavior and interactions, including code execution and integration with third-party agents.
```python
import os
os.environ["SERPER_API_KEY"] = "Your Key" # serper.dev API key
os.environ["OPENAI_API_KEY"] = "Your Key"
from langchain.llms import OpenAI
from crewai import Agent
from crewai_tools import SerperDevTool, BrowserbaseTool, ExaSearchTool
os.environ["OPENAI_API_KEY"] = "Your OpenAI Key"
os.environ["SERPER_API_KEY"] = "Your Serper Key"
search_tool = SerperDevTool()
browser_tool = BrowserbaseTool()
exa_search_tool = ExaSearchTool()
# Creating a senior researcher agent with memory and verbose mode
# Creating a senior researcher agent with advanced configurations
researcher = Agent(
role='Senior Researcher',
goal='Uncover groundbreaking technologies in {topic}',
verbose=True,
memory=True,
backstory=(
"Driven by curiosity, you're at the forefront of"
"innovation, eager to explore and share knowledge that could change"
"the world."
),
tools=[search_tool, browser_tool],
role='Senior Researcher',
goal='Uncover groundbreaking technologies in {topic}',
backstory=("Driven by curiosity, you're at the forefront of innovation, "
"eager to explore and share knowledge that could change the world."),
memory=True,
verbose=True,
allow_delegation=False,
tools=[search_tool, browser_tool],
allow_code_execution=False, # New attribute for enabling code execution
max_iter=15, # Maximum number of iterations for task execution
max_rpm=100, # Maximum requests per minute
max_execution_time=3600, # Maximum execution time in seconds
system_template="Your custom system template here", # Custom system template
prompt_template="Your custom prompt template here", # Custom prompt template
response_template="Your custom response template here", # Custom response template
)
# Creating a writer agent with custom tools and delegation capability
# Creating a writer agent with custom tools and specific configurations
writer = Agent(
role='Writer',
goal='Narrate compelling tech stories about {topic}',
verbose=True,
memory=True,
backstory=(
"With a flair for simplifying complex topics, you craft"
"engaging narratives that captivate and educate, bringing new"
"discoveries to light in an accessible manner."
),
tools=[exa_search_tool],
allow_delegation=False
role='Writer',
goal='Narrate compelling tech stories about {topic}',
backstory=("With a flair for simplifying complex topics, you craft engaging "
"narratives that captivate and educate, bringing new discoveries to light."),
verbose=True,
allow_delegation=False,
memory=True,
tools=[exa_search_tool],
function_calling_llm=OpenAI(model_name="gpt-3.5-turbo"), # Separate LLM for function calling
)
# Setting a specific manager agent
@@ -64,73 +68,16 @@ manager = Agent(
goal='Ensure the smooth operation and coordination of the team',
verbose=True,
backstory=(
"As a seasoned project manager, you excel in organizing"
"As a seasoned project manager, you excel in organizing "
"tasks, managing timelines, and ensuring the team stays on track."
)
)
```
## Step 2: Define the Tasks
Detail the specific objectives for your agents, including new features for asynchronous execution and output customization. These tasks ensure a targeted approach to their roles.
```python
from crewai import Task
# Research task
research_task = Task(
description=(
"Identify the next big trend in {topic}."
"Focus on identifying pros and cons and the overall narrative."
"Your final report should clearly articulate the key points,"
"its market opportunities, and potential risks."
),
expected_output='A comprehensive 3 paragraphs long report on the latest AI trends.',
tools=[search_tool],
agent=researcher,
callback="research_callback", # Example of task callback
human_input=True
)
# Writing task with language model configuration
write_task = Task(
description=(
"Compose an insightful article on {topic}."
"Focus on the latest trends and how it's impacting the industry."
"This article should be easy to understand, engaging, and positive."
),
expected_output='A 4 paragraph article on {topic} advancements formatted as markdown.',
tools=[exa_search_tool],
agent=writer,
output_file='new-blog-post.md', # Example of output customization
allow_code_execution=True, # Enable code execution for the manager
)
```
## Step 3: Form the Crew
Combine your agents into a crew, setting the workflow process they'll follow to accomplish the tasks. Now with options to configure language models for enhanced interaction and additional configurations for optimizing performance, such as creating directories when saving files.
### New Agent Attributes and Features
```python
from crewai import Crew, Process
# Forming the tech-focused crew with some enhanced configurations
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential, # Optional: Sequential task execution is default
memory=True,
cache=True,
max_rpm=100,
manager_agent=manager
)
```
## Step 4: Kick It Off
Initiate the process with your enhanced crew ready. Observe as your agents collaborate, leveraging their new capabilities for a successful project outcome. Input variables will be interpolated into the agents and tasks for a personalized approach.
```python
# Starting the task execution process with enhanced feedback
result = crew.kickoff(inputs={'topic': 'AI in healthcare'})
print(result)
```
## Conclusion
Building and activating a crew in CrewAI has evolved with new functionalities. By incorporating verbose mode, memory capabilities, asynchronous task execution, output customization, language model configuration, and enhanced crew configurations, your AI team is more equipped than ever to tackle challenges efficiently. The depth of agent backstories and the precision of their objectives enrich collaboration, leading to successful project outcomes. This guide aims to provide you with a clear and detailed understanding of setting up and utilizing the CrewAI framework to its full potential.
1. `allow_code_execution`: Enable or disable code execution capabilities for the agent (default is False).
2. `max_execution_time`: Set a maximum execution time (in seconds) for the agent to complete a task.
3. `function_calling_llm`: Specify a separate language model for function calling.
4

View File

@@ -0,0 +1,40 @@
---
title: Kickoff Async
description: Kickoff a Crew Asynchronously
---
## Introduction
CrewAI provides the ability to kickoff a crew asynchronously, allowing you to start the crew execution in a non-blocking manner. This feature is particularly useful when you want to run multiple crews concurrently or when you need to perform other tasks while the crew is executing.
## Asynchronous Crew Execution
To kickoff a crew asynchronously, use the `kickoff_async()` method. This method initiates the crew execution in a separate thread, allowing the main thread to continue executing other tasks.
Here's an example of how to kickoff a crew asynchronously:
```python
from crewai import Crew, Agent, Task
# Create an agent with code execution enabled
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
# Create a task that requires code execution
data_analysis_task = Task(
description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent
)
# Create a crew and add the task
analysis_crew = Crew(
agents=[coding_agent],
tasks=[data_analysis_task]
)
# Execute the crew
result = analysis_crew.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
```

View File

@@ -0,0 +1,45 @@
---
title: Kickoff For Each
description: Kickoff a Crew for a List
---
## Introduction
CrewAI provides the ability to kickoff a crew for each item in a list, allowing you to execute the crew for each item in the list. This feature is particularly useful when you need to perform the same set of tasks for multiple items.
## Kicking Off a Crew for Each Item
To kickoff a crew for each item in a list, use the `kickoff_for_each()` method. This method executes the crew for each item in the list, allowing you to process multiple items efficiently.
Here's an example of how to kickoff a crew for each item in a list:
```python
from crewai import Crew, Agent, Task
# Create an agent with code execution enabled
coding_agent = Agent(
role="Python Data Analyst",
goal="Analyze data and provide insights using Python",
backstory="You are an experienced data analyst with strong Python skills.",
allow_code_execution=True
)
# Create a task that requires code execution
data_analysis_task = Task(
description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent
)
# Create a crew and add the task
analysis_crew = Crew(
agents=[coding_agent],
tasks=[data_analysis_task]
)
datasets = [
{ "ages": [25, 30, 35, 40, 45] },
{ "ages": [20, 25, 30, 35, 40] },
{ "ages": [30, 35, 40, 45, 50] }
]
# Execute the crew
result = analysis_crew.kickoff_for_each(inputs=datasets)
```

View File

@@ -1,16 +1,18 @@
---
title: Connect CrewAI to LLMs
description: Comprehensive guide on integrating CrewAI with various Large Language Models (LLMs), including detailed class attributes and methods.
description: Comprehensive guide on integrating CrewAI with various Large Language Models (LLMs), including detailed class attributes, methods, and configuration options.
---
## Connect CrewAI to LLMs
!!! note "Default LLM"
By default, CrewAI uses OpenAI's GPT-4 model for language processing. You can configure your agents to use a different model or API. This guide shows how to connect your agents to various LLMs through environment variables and direct instantiation.
By default, CrewAI uses OpenAI's GPT-4 model (specifically, the model specified by the OPENAI_MODEL_NAME environment variable, defaulting to "gpt-4o") for language processing. You can configure your agents to use a different model or API as described in this guide.
CrewAI offers flexibility in connecting to various LLMs, including local models via [Ollama](https://ollama.ai) and different APIs like Azure. It's compatible with all [LangChain LLM](https://python.langchain.com/docs/integrations/llms/) components, enabling diverse integrations for tailored AI solutions.
## CrewAI Agent Overview
The `Agent` class is the cornerstone for implementing AI solutions in CrewAI. Here's an updated overview reflecting the latest codebase changes:
The `Agent` class is the cornerstone for implementing AI solutions in CrewAI. Here's a comprehensive overview of the Agent class attributes and methods:
- **Attributes**:
- `role`: Defines the agent's role within the solution.
@@ -50,54 +52,24 @@ Ollama is preferred for local LLM integration, offering customization and privac
### Setting Up Ollama
- **Environment Variables Configuration**: To integrate Ollama, set the following environment variables:
```sh
OPENAI_API_BASE='http://localhost:11434/v1'
OPENAI_MODEL_NAME='openhermes' # Adjust based on available model
OPENAI_API_BASE='http://localhost:11434'
OPENAI_MODEL_NAME='llama2' # Adjust based on available model
OPENAI_API_KEY=''
```
## Ollama Integration (ex. for using Llama 2 locally)
1. [Download Ollama](https://ollama.com/download).
2. After setting up the Ollama, Pull the Llama2 by typing following lines into the terminal ```ollama pull llama2```.
3. Create a ModelFile similar the one below in your project directory.
1. [Download Ollama](https://ollama.com/download).
2. After setting up the Ollama, Pull the Llama2 by typing following lines into the terminal ```ollama pull llama2```.
3. Enjoy your free Llama2 model that powered up by excellent agents from crewai.
```
FROM llama2
# Set parameters
PARAMETER temperature 0.8
PARAMETER stop Result
# Sets a custom system message to specify the behavior of the chat assistant
# Leaving it blank for now.
SYSTEM """"""
```
4. Create a script to get the base model, which in our case is llama2, and create a model on top of that with ModelFile above. PS: this will be ".sh" file.
```
#!/bin/zsh
# variables
model_name="llama2"
custom_model_name="crewai-llama2"
#get the base model
ollama pull $model_name
#create the model file
ollama create $custom_model_name -f ./Llama2ModelFile
```
5. Go into the directory where the script file and ModelFile is located and run the script.
6. Enjoy your free Llama2 model that is powered up by excellent agents from CrewAI.
```python
from crewai import Agent, Task, Crew
from langchain_openai import ChatOpenAI
from langchain.llms import Ollama
import os
os.environ["OPENAI_API_KEY"] = "NA"
llm = ChatOpenAI(
model = "crewai-llama2",
base_url = "http://localhost:11434/v1")
llm = Ollama(
model = "llama2",
base_url = "http://localhost:11434")
general_agent = Agent(role = "Math Professor",
goal = """Provide the solution to the students that are asking mathematical questions and give them the answer.""",

View File

@@ -1,44 +1,89 @@
---
title: CrewAI Agent Monitoring with Langtrace
description: How to monitor cost, latency, and performance of CrewAI Agents using Langtrace.
description: How to monitor cost, latency, and performance of CrewAI Agents using Langtrace, an external observability tool.
---
# Langtrace Overview
Langtrace is an open-source tool that helps you set up observability and evaluations for LLMs, LLM frameworks, and VectorDB. With Langtrace, you can get deep visibility into the cost, latency, and performance of your CrewAI Agents. Additionally, you can log the hyperparameters and monitor for any performance regressions and set up a process to continuously improve your Agents.
Langtrace is an open-source, external tool that helps you set up observability and evaluations for Large Language Models (LLMs), LLM frameworks, and Vector Databases. While not built directly into CrewAI, Langtrace can be used alongside CrewAI to gain deep visibility into the cost, latency, and performance of your CrewAI Agents. This integration allows you to log hyperparameters, monitor performance regressions, and establish a process for continuous improvement of your Agents.
## Setup Instructions
1. Sign up for [Langtrace](https://langtrace.ai/) by going to [https://langtrace.ai/signup](https://langtrace.ai/signup).
1. Sign up for [Langtrace](https://langtrace.ai/) by visiting [https://langtrace.ai/signup](https://langtrace.ai/signup).
2. Create a project and generate an API key.
3. Install Langtrace in your code using the following commands.
**Note**: For detailed instructions on integrating Langtrace, you can check out the official docs from [here](https://docs.langtrace.ai/supported-integrations/llm-frameworks/crewai).
3. Install Langtrace in your CrewAI project using the following commands:
```
```bash
# Install the SDK
pip install langtrace-python-sdk
# Import it into your project
from langtrace_python_sdk import langtrace # Must precede any llm module imports
langtrace.init(api_key = '<LANGTRACE_API_KEY>')
```
### Features
- **LLM Token and Cost tracking**
- **Trace graph showing detailed execution steps with latency and logs**
- **Dataset curation using manual annotation**
- **Prompt versioning and management**
- **Prompt Playground with comparison views between models**
- **Testing and Evaluations**
## Using Langtrace with CrewAI
![Langtrace Cost and Usage Tracking](..%2Fassets%2Fcrewai-langtrace-stats.png)
![Langtrace Span Graph and Logs Dashboard](..%2Fassets%2Fcrewai-langtrace-spans.png)
To integrate Langtrace with your CrewAI project, follow these steps:
#### Extra links
1. Import and initialize Langtrace at the beginning of your script, before any CrewAI imports:
<a href="https://x.com/langtrace_ai">🐦 Twitter</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://discord.com/invite/EaSATwtr4t">📢 Discord</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://langtrace.ai/">🖇 Website</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://docs.langtrace.ai/introduction">📙 Documentation</a>
```python
from langtrace_python_sdk import langtrace
langtrace.init(api_key='<LANGTRACE_API_KEY>')
# Now import CrewAI modules
from crewai import Agent, Task, Crew
```
2. Create your CrewAI agents and tasks as usual.
3. Use Langtrace's tracking functions to monitor your CrewAI operations. For example:
```python
with langtrace.trace("CrewAI Task Execution"):
result = crew.kickoff()
```
### Features and Their Application to CrewAI
1. **LLM Token and Cost Tracking**
- Monitor the token usage and associated costs for each CrewAI agent interaction.
- Example:
```python
with langtrace.trace("Agent Interaction"):
agent_response = agent.execute(task)
```
2. **Trace Graph for Execution Steps**
- Visualize the execution flow of your CrewAI tasks, including latency and logs.
- Useful for identifying bottlenecks in your agent workflows.
3. **Dataset Curation with Manual Annotation**
- Create datasets from your CrewAI task outputs for future training or evaluation.
- Example:
```python
langtrace.log_dataset_item(task_input, agent_output, {"task_type": "research"})
```
4. **Prompt Versioning and Management**
- Keep track of different versions of prompts used in your CrewAI agents.
- Useful for A/B testing and optimizing agent performance.
5. **Prompt Playground with Model Comparisons**
- Test and compare different prompts and models for your CrewAI agents before deployment.
6. **Testing and Evaluations**
- Set up automated tests for your CrewAI agents and tasks.
- Example:
```python
langtrace.evaluate(agent_output, expected_output, "accuracy")
```
## Monitoring New CrewAI Features
CrewAI has introduced several new features that can be monitored using Langtrace:
1. **Code Execution**: Monitor the performance and output of code executed by agents.
```python
with langtrace.trace("Agent Code Execution"):
code_output = agent.execute_code(code_snippet)
```
2. **Third-party Agent Integration**: Track interactions with LlamaIndex, LangChain, and Autogen agents.

View File

@@ -15,7 +15,7 @@ The sequential process ensures tasks are executed one after the other, following
- **Easy Monitoring**: Facilitates easy tracking of task completion and project progress.
## Implementing the Sequential Process
Assemble your crew and define tasks in the order they need to be executed.
To use the sequential process, assemble your crew and define tasks in the order they need to be executed.
```python
from crewai import Crew, Process, Agent, Task
@@ -48,6 +48,9 @@ report_crew = Crew(
tasks=[research_task, analysis_task, writing_task],
process=Process.sequential
)
# Execute the crew
result = report_crew.kickoff()
```
### Workflow in Action
@@ -55,5 +58,29 @@ report_crew = Crew(
2. **Subsequent Tasks**: Agents pick up their tasks based on the process type, with outcomes of preceding tasks or manager directives guiding their execution.
3. **Completion**: The process concludes once the final task is executed, leading to project completion.
## Conclusion
The sequential and hierarchical processes in CrewAI offer clear, adaptable paths for task execution. They are well-suited for projects requiring logical progression and dynamic decision-making, ensuring each step is completed effectively, thereby facilitating a cohesive final product.
## Advanced Features
### Task Delegation
In sequential processes, if an agent has `allow_delegation` set to `True`, they can delegate tasks to other agents in the crew. This feature is automatically set up when there are multiple agents in the crew.
### Asynchronous Execution
Tasks can be executed asynchronously, allowing for parallel processing when appropriate. To create an asynchronous task, set `async_execution=True` when defining the task.
### Memory and Caching
CrewAI supports both memory and caching features:
- **Memory**: Enable by setting `memory=True` when creating the Crew. This allows agents to retain information across tasks.
- **Caching**: By default, caching is enabled. Set `cache=False` to disable it.
### Callbacks
You can set callbacks at both the task and step level:
- `task_callback`: Executed after each task completion.
- `step_callback`: Executed after each step in an agent's execution.
### Usage Metrics
CrewAI tracks token usage across all tasks and agents. You can access these metrics after execution.
## Best Practices for Sequential Processes
1. **Order Matters**: Arrange tasks in a logical sequence where each task builds upon the previous one.
2. **Clear Task Descriptions**: Provide detailed descriptions for each task to guide the agents effectively.
3. **Appropriate Agent Selection**: Match agents' skills and roles to the requirements of each task.
4. **Use Context**: Leverage the context from previous tasks to inform subsequent ones

View File

@@ -1,12 +1,12 @@
---
title: Ability to Set a Specific Agent as Manager in CrewAI
description: Introducing the ability to set a specific agent as a manager instead of having CrewAI create one automatically.
title: Setting a Specific Agent as Manager in CrewAI
description: Learn how to set a custom agent as the manager in CrewAI, providing more control over task management and coordination.
---
# Ability to Set a Specific Agent as Manager in CrewAI
# Setting a Specific Agent as Manager in CrewAI
CrewAI now allows users to set a specific agent as the manager of the crew, providing more control over the management and coordination of tasks. This feature enables the customization of the managerial role to better fit the project's requirements.
CrewAI allows users to set a specific agent as the manager of the crew, providing more control over the management and coordination of tasks. This feature enables the customization of the managerial role to better fit your project's requirements.
## Using the `manager_agent` Attribute
@@ -23,46 +23,65 @@ from crewai import Agent, Task, Crew, Process
# Define your agents
researcher = Agent(
role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
goal="Conduct thorough research and analysis on AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI, and startups. You work as a freelancer and are currently researching for a new client.",
allow_delegation=False,
)
writer = Agent(
role="Senior Writer",
goal="Write the best content about AI and AI agents.",
backstory="You're a senior writer, specialized in technology, software engineering, AI and startups. You work as a freelancer and are now working on writing content for a new customer.",
goal="Create compelling content about AI and AI agents",
backstory="You're a senior writer, specialized in technology, software engineering, AI, and startups. You work as a freelancer and are currently writing content for a new client.",
allow_delegation=False,
)
# Define your task
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
description="Generate a list of 5 interesting ideas for an article, then write one captivating paragraph for each idea that showcases the potential of a full article on this topic. Return the list of ideas with their paragraphs and your notes.",
expected_output="5 bullet points, each with a paragraph and accompanying notes.",
)
# Define the manager agent
manager = Agent(
role="Manager",
goal="Manage the crew and ensure the tasks are completed efficiently.",
backstory="You're an experienced manager, skilled in overseeing complex projects and guiding teams to success. Your role is to coordinate the efforts of the crew members, ensuring that each task is completed on time and to the highest standard.",
allow_delegation=False,
role="Project Manager",
goal="Efficiently manage the crew and ensure high-quality task completion",
backstory="You're an experienced project manager, skilled in overseeing complex projects and guiding teams to success. Your role is to coordinate the efforts of the crew members, ensuring that each task is completed on time and to the highest standard.",
allow_delegation=True,
)
# Instantiate your crew with a custom manager
crew = Crew(
agents=[researcher, writer],
process=Process.hierarchical,
manager_agent=manager,
tasks=[task],
manager_agent=manager,
process=Process.hierarchical,
)
# Get your crew to work!
crew.kickoff()
# Start the crew's work
result = crew.kickoff()
```
## Benefits of a Custom Manager Agent
- **Enhanced Control**: Allows for a more tailored management approach, fitting the specific needs of the project.
- **Improved Coordination**: Ensures that the tasks are efficiently coordinated and managed by an experienced agent.
- **Customizable Management**: Provides the flexibility to define managerial roles and responsibilities that align with the project's goals.
- **Enhanced Control**: Tailor the management approach to fit the specific needs of your project.
- **Improved Coordination**: Ensure efficient task coordination and management by an experienced agent.
- **Customizable Management**: Define managerial roles and responsibilities that align with your project's goals.
## Setting a Manager LLM
If you're using the hierarchical process and don't want to set a custom manager agent, you can specify the language model for the manager:
```python
from langchain_openai import ChatOpenAI
manager_llm = ChatOpenAI(model_name="gpt-4")
crew = Crew(
agents=[researcher, writer],
tasks=[task],
process=Process.hierarchical,
manager_llm=manager_llm
)
```
Note: Either `manager_agent` or `manager_llm` must be set when using the hierarchical process.

View File

@@ -33,6 +33,11 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
Crews
</a>
</li>
<li>
<a href="./core-concepts/Training-Crew">
Training
</a>
</li>
<li>
<a href="./core-concepts/Memory">
Memory
@@ -78,16 +83,36 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
Customizing Agents
</a>
</li>
<li>
<a href="./how-to/Coding-Agents">
Coding Agents
</a>
</li>
<li>
<a href="./how-to/Human-Input-on-Execution">
Human Input on Execution
</a>
</li>
<li>
<a href="./how-to/Kickoff-async">
Kickoff a Crew Asynchronously
</a>
</li>
<li>
<a href="./how-to/Kickoff-for-each">
Kickoff a Crew for a List
</a>
</li>
<li>
<a href="./how-to/AgentOps-Observability">
Agent Monitoring with AgentOps
</a>
</li>
<li>
<a href="./how-to/Langtrace-Observability">
Agent Monitoring with LangTrace
</a>
</li>
</ul>
</div>
<div style="width:30%">

View File

@@ -0,0 +1,41 @@
# CodeInterpreterTool
## Description
This tool is used to give the Agent the ability to run code (Python3) from the code generated by the Agent itself. The code is executed in a sandboxed environment, so it is safe to run any code.
It is incredible useful since it allows the Agent to generate code, run it in the same environment, get the result and use it to make decisions.
## Requirements
- Docker
## Installation
Install the crewai_tools package
```shell
pip install 'crewai[tools]'
```
## Example
Remember that when using this tool, the code must be generated by the Agent itself. The code must be a Python3 code. And it will take some time for the first time to run because it needs to build the Docker image.
```python
from crewai import Agent
from crewai_tools import CodeInterpreterTool
Agent(
...
tools=[CodeInterpreterTool()],
)
```
We also provide a simple way to use it directly from the Agent.
```python
from crewai import Agent
agent = Agent(
...
allow_code_execution=True,
)
```

View File

@@ -0,0 +1,72 @@
# ComposioTool Documentation
## Description
This tools is a wrapper around the composio toolset and gives your agent access to a wide variety of tools from the composio SDK.
## Installation
To incorporate this tool into your project, follow the installation instructions below:
```shell
pip install composio-core
pip install 'crewai[tools]'
```
after the installation is complete, either run `composio login` or export your composio API key as `COMPOSIO_API_KEY`.
## Example
The following example demonstrates how to initialize the tool and execute a github action:
1. Initialize toolset
```python
from composio import App
from crewai_tools import ComposioTool
from crewai import Agent, Task
tools = [ComposioTool.from_action(action=Action.GITHUB_ACTIVITY_STAR_REPO_FOR_AUTHENTICATED_USER)]
```
If you don't know what action you want to use, use `from_app` and `tags` filter to get relevant actions
```python
tools = ComposioTool.from_app(App.GITHUB, tags=["important"])
```
or use `use_case` to search relevant actions
```python
tools = ComposioTool.from_app(App.GITHUB, use_case="Star a github repository")
```
2. Define agent
```python
crewai_agent = Agent(
role="Github Agent",
goal="You take action on Github using Github APIs",
backstory=(
"You are AI agent that is responsible for taking actions on Github "
"on users behalf. You need to take action on Github using Github APIs"
),
verbose=True,
tools=tools,
)
```
3. Execute task
```python
task = Task(
description="Star a repo ComposioHQ/composio on GitHub",
agent=crewai_agent,
expected_output="if the star happened",
)
task.execute()
```
* More detailed list of tools can be found [here](https://app.composio.dev)

View File

@@ -29,7 +29,7 @@ tool = PDFSearchTool(pdf='path/to/your/document.pdf')
```
## Arguments
- `pdf`: **Optinal** The PDF path for the search. Can be provided at initialization or within the `run` method's arguments. If provided at initialization, the tool confines its search to the specified document.
- `pdf`: **Optional** The PDF path for the search. Can be provided at initialization or within the `run` method's arguments. If provided at initialization, the tool confines its search to the specified document.
## Custom model and embeddings

View File

@@ -126,6 +126,7 @@ nav:
- Processes: 'core-concepts/Processes.md'
- Crews: 'core-concepts/Crews.md'
- Collaboration: 'core-concepts/Collaboration.md'
- Training: 'core-concepts/Training-Crew.md'
- Memory: 'core-concepts/Memory.md'
- Using LangChain Tools: 'core-concepts/Using-LangChain-Tools.md'
- Using LlamaIndex Tools: 'core-concepts/Using-LlamaIndex-Tools.md'
@@ -138,12 +139,17 @@ nav:
- Create your own Manager Agent: 'how-to/Your-Own-Manager-Agent.md'
- Connecting to any LLM: 'how-to/LLM-Connections.md'
- Customizing Agents: 'how-to/Customizing-Agents.md'
- Coding Agents: 'how-to/Coding-Agents.md'
- Human Input on Execution: 'how-to/Human-Input-on-Execution.md'
- Kickoff a Crew Asynchronously: 'how-to/Kickoff-async.md'
- Kickoff a Crew for a List: 'how-to/Kickoff-for-each.md'
- Agent Monitoring with AgentOps: 'how-to/AgentOps-Observability.md'
- Agent Monitoring with LangTrace: 'how-to/Langtrace-Observability.md'
- Tools Docs:
- Google Serper Search: 'tools/SerperDevTool.md'
- Browserbase Web Loader: 'tools/BrowserbaseLoadTool.md'
- Composio Tools: 'tools/ComposioTool.md'
- Code Interpreter: 'tools/CodeInterpreterTool.md'
- Scrape Website: 'tools/ScrapeWebsiteTool.md'
- Directory Read: 'tools/DirectoryReadTool.md'
- Exa Serch Web Loader: 'tools/EXASearchTool.md'

1401
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "crewai"
version = "0.32.2"
version = "0.35.7"
description = "Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks."
authors = ["Joao Moura <joao@crewai.com>"]
readme = "README.md"
@@ -14,19 +14,19 @@ Repository = "https://github.com/joaomdmoura/crewai"
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
pydantic = "^2.4.2"
langchain = "^0.1.10"
langchain = ">=0.1.4,<0.2.0"
openai = "^1.13.3"
opentelemetry-api = "^1.22.0"
opentelemetry-sdk = "^1.22.0"
opentelemetry-exporter-otlp-proto-http = "^1.22.0"
instructor = "1.3.3"
regex = "^2023.12.25"
crewai-tools = { version = "^0.3.0", optional = true }
crewai-tools = { version = "^0.4.5", optional = true }
click = "^8.1.7"
python-dotenv = "^1.0.0"
embedchain = "0.1.109"
appdirs = "^1.4.4"
jsonref = "^1.1.0"
embedchain = "^0.1.113"
[tool.poetry.extras]
tools = ["crewai-tools"]
@@ -43,7 +43,7 @@ mkdocs-material = { extras = ["imaging"], version = "^9.5.7" }
mkdocs-material-extensions = "^1.3.1"
pillow = "^10.2.0"
cairosvg = "^2.7.1"
crewai-tools = "^0.3.0"
crewai-tools = "^0.4.5"
[tool.poetry.group.test.dependencies]
pytest = "^8.0.0"
@@ -60,4 +60,4 @@ exclude = ["cli/templates/main.py", "cli/templates/crew.py"]
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
build-backend = "poetry.core.masonry.api"

View File

@@ -2,3 +2,5 @@ from crewai.agent import Agent
from crewai.crew import Crew
from crewai.process import Process
from crewai.task import Task
__all__ = ["Agent", "Crew", "Process", "Task"]

View File

@@ -1,7 +1,5 @@
from copy import deepcopy
import os
import uuid
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, List, Optional, Tuple
from langchain.agents.agent import RunnableAgent
from langchain.agents.tools import tool as LangChainTool
@@ -9,25 +7,20 @@ from langchain.tools.render import render_text_description
from langchain_core.agents import AgentAction
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
PrivateAttr,
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
from crewai.agents import CacheHandler, CrewAgentExecutor, CrewAgentParser, ToolsHandler
from pydantic import Field, InstanceOf, model_validator
from crewai.agents import CacheHandler, CrewAgentExecutor, CrewAgentParser
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.utilities import I18N, Logger, Prompts, RPMController
from crewai.utilities.token_counter_callback import TokenCalcHandler, TokenProcess
from crewai.tools.agent_tools import AgentTools
from crewai.utilities import Prompts, Converter
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.utilities.training_handler import CrewTrainingHandler
class Agent(BaseModel):
class Agent(BaseAgent):
"""Represents an agent in a system.
Each agent has a role, a goal, a backstory, and an optional language model (llm).
@@ -51,58 +44,10 @@ class Agent(BaseModel):
callbacks: A list of callback functions from the langchain library that are triggered during the agent's execution process
"""
__hash__ = object.__hash__ # type: ignore
_logger: Logger = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr(default=None)
_request_within_rpm_limit: Any = PrivateAttr(default=None)
_token_process: TokenProcess = TokenProcess()
formatting_errors: int = 0
model_config = ConfigDict(arbitrary_types_allowed=True)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
cache: bool = Field(
default=True,
description="Whether the agent should use a cache for tool usage.",
)
config: Optional[Dict[str, Any]] = Field(
description="Configuration for the agent",
default=None,
)
max_rpm: Optional[int] = Field(
default=None,
description="Maximum number of requests per minute for the agent execution to be respected.",
)
verbose: bool = Field(
default=False, description="Verbose mode for the Agent Execution"
)
allow_delegation: bool = Field(
default=True, description="Allow delegation of tasks to agents"
)
tools: Optional[List[Any]] = Field(
default_factory=list, description="Tools at agents disposal"
)
max_iter: Optional[int] = Field(
default=25, description="Maximum iterations for an agent to execute a task"
)
max_execution_time: Optional[int] = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
)
agent_executor: InstanceOf[CrewAgentExecutor] = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
crew: Any = Field(
default=None, description="Crew to which the agent belongs.")
tools_handler: InstanceOf[ToolsHandler] = Field(
default=None, description="An instance of the ToolsHandler class."
)
cache_handler: InstanceOf[CacheHandler] = Field(
default=None, description="An instance of the CacheHandler class."
)
@@ -110,8 +55,6 @@ class Agent(BaseModel):
default=None,
description="Callback to be executed after each step of the agent execution.",
)
i18n: I18N = Field(
default=I18N(), description="Internationalization settings.")
llm: Any = Field(
default_factory=lambda: ChatOpenAI(
model=os.environ.get("OPENAI_MODEL_NAME", "gpt-4o")
@@ -134,46 +77,19 @@ class Agent(BaseModel):
default=None, description="Response format for the agent."
)
_original_role: str | None = None
_original_goal: str | None = None
_original_backstory: str | None = None
allow_code_execution: Optional[bool] = Field(
default=False, description="Enable code execution for the agent."
)
def __init__(__pydantic_self__, **data):
config = data.pop("config", {})
super().__init__(**config, **data)
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@model_validator(mode="after")
def set_attributes_based_on_config(self) -> "Agent":
"""Set attributes based on the agent configuration."""
if self.config:
for key, value in self.config.items():
setattr(self, key, value)
return self
@model_validator(mode="after")
def set_private_attrs(self):
"""Set private attributes."""
self._logger = Logger(self.verbose)
if self.max_rpm and not self._rpm_controller:
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
return self
@model_validator(mode="after")
def set_agent_executor(self) -> "Agent":
"""set agent executor is set."""
"""Ensure agent executor and token process is set."""
if hasattr(self.llm, "model_name"):
token_handler = TokenCalcHandler(
self.llm.model_name, self._token_process)
token_handler = TokenCalcHandler(self.llm.model_name, self._token_process)
# Ensure self.llm.callbacks is a list
if not isinstance(self.llm.callbacks, list):
@@ -230,16 +146,19 @@ class Agent(BaseModel):
tools = tools or self.tools
# type: ignore # Argument 1 to "_parse_tools" of "Agent" has incompatible type "list[Any] | None"; expected "list[Any]"
parsed_tools = self._parse_tools(tools)
parsed_tools = self._parse_tools(tools or [])
self.create_agent_executor(tools=tools)
self.agent_executor.tools = parsed_tools
self.agent_executor.task = task
self.agent_executor.tools_description = render_text_description(
parsed_tools)
self.agent_executor.tools_description = render_text_description(parsed_tools)
self.agent_executor.tools_names = self.__tools_names(parsed_tools)
if self.crew and self.crew._train:
task_prompt = self._training_handler(task_prompt=task_prompt)
else:
task_prompt = self._use_trained_data(task_prompt=task_prompt)
result = self.agent_executor.invoke(
{
"input": task_prompt,
@@ -247,33 +166,22 @@ class Agent(BaseModel):
"tools": self.agent_executor.tools_description,
}
)["output"]
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
return result
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
"""Set the cache handler for the agent.
Args:
cache_handler: An instance of the CacheHandler class.
"""
self.tools_handler = ToolsHandler()
if self.cache:
self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler
self.create_agent_executor()
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
Args:
rpm_controller: An instance of the RPMController class.
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.create_agent_executor()
def format_log_to_str(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
observation_prefix: str = "Observation: ",
llm_prefix: str = "",
) -> str:
"""Construct the scratchpad that lets the agent continue its thought process."""
thoughts = ""
for action, observation in intermediate_steps:
thoughts += action.log
thoughts += f"\n{observation_prefix}{observation}\n{llm_prefix}"
return thoughts
def create_agent_executor(self, tools=None) -> None:
"""Create an agent executor for the agent.
@@ -329,77 +237,42 @@ class Agent(BaseModel):
)
stop_words = [self.i18n.slice("observation")]
if self.response_template:
stop_words.append(
self.response_template.split("{{ .Response }}")[1].strip()
)
bind = self.llm.bind(stop=stop_words)
inner_agent = agent_args | execution_prompt | bind | CrewAgentParser(
agent=self)
inner_agent = agent_args | execution_prompt | bind | CrewAgentParser(agent=self)
self.agent_executor = CrewAgentExecutor(
agent=RunnableAgent(runnable=inner_agent), **executor_args
)
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""Interpolate inputs into the agent description and backstory."""
if self._original_role is None:
self._original_role = self.role
if self._original_goal is None:
self._original_goal = self.goal
if self._original_backstory is None:
self._original_backstory = self.backstory
def get_delegation_tools(self, agents: List[BaseAgent]):
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()
return tools
if inputs:
self.role = self._original_role.format(**inputs)
self.goal = self._original_goal.format(**inputs)
self.backstory = self._original_backstory.format(**inputs)
def get_code_execution_tools(self):
try:
from crewai_tools import CodeInterpreterTool
def increment_formatting_errors(self) -> None:
"""Count the formatting errors of the agent."""
self.formatting_errors += 1
return [CodeInterpreterTool()]
except ModuleNotFoundError:
self._logger.log(
"info", "Coding tools not available. Install crewai_tools. "
)
def format_log_to_str(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
observation_prefix: str = "Observation: ",
llm_prefix: str = "",
) -> str:
"""Construct the scratchpad that lets the agent continue its thought process."""
thoughts = ""
for action, observation in intermediate_steps:
thoughts += action.log
thoughts += f"\n{observation_prefix}{observation}\n{llm_prefix}"
return thoughts
def copy(self):
"""Create a deep copy of the Agent."""
exclude = {
"id",
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"agent_executor",
"tools",
"tools_handler",
"cache_handler",
}
def get_output_converter(self, llm, text, model, instructions):
return Converter(llm=llm, text=text, model=model, instructions=instructions)
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
copied_agent = Agent(**copied_data)
copied_agent.tools = deepcopy(self.tools)
return copied_agent
# type: ignore # Function "langchain_core.tools.tool" is not valid as a type
def _parse_tools(self, tools: List[Any]) -> List[LangChainTool]:
"""Parse tools to be used for the task."""
# tentatively try to import from crewai_tools import BaseTool as CrewAITool
tools_list = []
try:
# tentatively try to import from crewai_tools import BaseTool as CrewAITool
from crewai_tools import BaseTool as CrewAITool
for tool in tools:
@@ -408,10 +281,35 @@ class Agent(BaseModel):
else:
tools_list.append(tool)
except ModuleNotFoundError:
tools_list = []
for tool in tools:
tools_list.append(tool)
return tools_list
def _training_handler(self, task_prompt: str) -> str:
"""Handle training data for the agent task prompt to improve output on Training."""
if data := CrewTrainingHandler(TRAINING_DATA_FILE).load():
agent_id = str(self.id)
if data.get(agent_id):
human_feedbacks = [
i["human_feedback"] for i in data.get(agent_id, {}).values()
]
task_prompt += "You MUST follow these feedbacks: \n " + "\n - ".join(
human_feedbacks
)
return task_prompt
def _use_trained_data(self, task_prompt: str) -> str:
"""Use trained data for the agent task prompt to improve output."""
if data := CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).load():
if trained_data_output := data.get(self.role):
task_prompt += "You MUST follow these feedbacks: \n " + "\n - ".join(
trained_data_output["suggestions"]
)
return task_prompt
@staticmethod
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])

View File

@@ -0,0 +1,256 @@
from copy import deepcopy
import uuid
from typing import Any, Dict, List, Optional
from abc import ABC, abstractmethod
from pydantic import (
UUID4,
BaseModel,
Field,
InstanceOf,
field_validator,
model_validator,
ConfigDict,
PrivateAttr,
)
from pydantic_core import PydanticCustomError
from crewai.utilities import I18N, RPMController, Logger
from crewai.agents import CacheHandler, ToolsHandler
from crewai.utilities.token_counter_callback import TokenProcess
class BaseAgent(ABC, BaseModel):
"""Abstract Base Class for all third party agents compatible with CrewAI.
Attributes:
id (UUID4): Unique identifier for the agent.
role (str): Role of the agent.
goal (str): Objective of the agent.
backstory (str): Backstory of the agent.
cache (bool): Whether the agent should use a cache for tool usage.
config (Optional[Dict[str, Any]]): Configuration for the agent.
verbose (bool): Verbose mode for the Agent Execution.
max_rpm (Optional[int]): Maximum number of requests per minute for the agent execution.
allow_delegation (bool): Allow delegation of tasks to agents.
tools (Optional[List[Any]]): Tools at the agent's disposal.
max_iter (Optional[int]): Maximum iterations for an agent to execute a task.
agent_executor (InstanceOf): An instance of the CrewAgentExecutor class.
llm (Any): Language model that will run the agent.
crew (Any): Crew to which the agent belongs.
i18n (I18N): Internationalization settings.
cache_handler (InstanceOf[CacheHandler]): An instance of the CacheHandler class.
tools_handler (InstanceOf[ToolsHandler]): An instance of the ToolsHandler class.
Methods:
execute_task(task: Any, context: Optional[str] = None, tools: Optional[List[Any]] = None) -> str:
Abstract method to execute a task.
create_agent_executor(tools=None) -> None:
Abstract method to create an agent executor.
_parse_tools(tools: List[Any]) -> List[Any]:
Abstract method to parse tools.
get_delegation_tools(agents: List["BaseAgent"]):
Abstract method to set the agents task tools for handling delegation and question asking to other agents in crew.
get_output_converter(llm, model, instructions):
Abstract method to get the converter class for the agent to create json/pydantic outputs.
interpolate_inputs(inputs: Dict[str, Any]) -> None:
Interpolate inputs into the agent description and backstory.
set_cache_handler(cache_handler: CacheHandler) -> None:
Set the cache handler for the agent.
increment_formatting_errors() -> None:
Increment formatting errors.
copy() -> "BaseAgent":
Create a copy of the agent.
set_rpm_controller(rpm_controller: RPMController) -> None:
Set the rpm controller for the agent.
set_private_attrs() -> "BaseAgent":
Set private attributes.
"""
__hash__ = object.__hash__ # type: ignore
_logger: Logger = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr(default=None)
_request_within_rpm_limit: Any = PrivateAttr(default=None)
formatting_errors: int = 0
model_config = ConfigDict(arbitrary_types_allowed=True)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
cache: bool = Field(
default=True, description="Whether the agent should use a cache for tool usage."
)
config: Optional[Dict[str, Any]] = Field(
description="Configuration for the agent", default=None
)
verbose: bool = Field(
default=False, description="Verbose mode for the Agent Execution"
)
max_rpm: Optional[int] = Field(
default=None,
description="Maximum number of requests per minute for the agent execution to be respected.",
)
allow_delegation: bool = Field(
default=True, description="Allow delegation of tasks to agents"
)
tools: Optional[List[Any]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
max_iter: Optional[int] = Field(
default=25, description="Maximum iterations for an agent to execute a task"
)
agent_executor: InstanceOf = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
llm: Any = Field(
default=None, description="Language model that will run the agent."
)
crew: Any = Field(default=None, description="Crew to which the agent belongs.")
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
cache_handler: InstanceOf[CacheHandler] = Field(
default=None, description="An instance of the CacheHandler class."
)
tools_handler: InstanceOf[ToolsHandler] = Field(
default=None, description="An instance of the ToolsHandler class."
)
_original_role: str | None = None
_original_goal: str | None = None
_original_backstory: str | None = None
_token_process: TokenProcess = TokenProcess()
def __init__(__pydantic_self__, **data):
config = data.pop("config", {})
super().__init__(**config, **data)
@model_validator(mode="after")
def set_config_attributes(self):
if self.config:
for key, value in self.config.items():
setattr(self, key, value)
return self
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@model_validator(mode="after")
def set_attributes_based_on_config(self) -> "BaseAgent":
"""Set attributes based on the agent configuration."""
if self.config:
for key, value in self.config.items():
setattr(self, key, value)
return self
@model_validator(mode="after")
def set_private_attrs(self):
"""Set private attributes."""
self._logger = Logger(self.verbose)
if self.max_rpm and not self._rpm_controller:
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
if not self._token_process:
self._token_process = TokenProcess()
return self
@abstractmethod
def execute_task(
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
pass
@abstractmethod
def create_agent_executor(self, tools=None) -> None:
pass
@abstractmethod
def _parse_tools(self, tools: List[Any]) -> List[Any]:
pass
@abstractmethod
def get_delegation_tools(self, agents: List["BaseAgent"]):
"""Set the task tools that init BaseAgenTools class."""
pass
@abstractmethod
def get_output_converter(
self, llm: Any, text: str, model: type[BaseModel] | None, instructions: str
):
"""Get the converter class for the agent to create json/pydantic outputs."""
pass
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""Interpolate inputs into the agent description and backstory."""
if self._original_role is None:
self._original_role = self.role
if self._original_goal is None:
self._original_goal = self.goal
if self._original_backstory is None:
self._original_backstory = self.backstory
if inputs:
self.role = self._original_role.format(**inputs)
self.goal = self._original_goal.format(**inputs)
self.backstory = self._original_backstory.format(**inputs)
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
"""Set the cache handler for the agent.
Args:
cache_handler: An instance of the CacheHandler class.
"""
self.tools_handler = ToolsHandler()
if self.cache:
self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler
self.create_agent_executor()
def increment_formatting_errors(self) -> None:
self.formatting_errors += 1
def copy(self):
exclude = {
"id",
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"token_process",
"agent_executor",
"tools",
"tools_handler",
"cache_handler",
"crew",
"llm",
}
copied_data = self.model_dump(exclude=exclude, exclude_unset=True)
copied_agent = self.__class__(**copied_data)
# Copy mutable attributes separately
copied_agent.tools = deepcopy(self.tools)
copied_agent.config = deepcopy(self.config)
# Preserve original values for interpolation
copied_agent._original_role = self._original_role
copied_agent._original_goal = self._original_goal
copied_agent._original_backstory = self._original_backstory
return copied_agent
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
Args:
rpm_controller: An instance of the RPMController class.
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.create_agent_executor()

View File

@@ -0,0 +1,65 @@
import time
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
class CrewAgentExecutorMixin:
def _should_force_answer(self) -> bool:
return (
self.iterations == self.force_answer_max_iterations
) and not self.have_forced_answer
def _create_short_term_memory(self, output) -> None:
if (
self.crew
and self.crew.memory
and "Action: Delegate work to coworker" not in output.log
):
memory = ShortTermMemoryItem(
data=output.log,
agent=self.crew_agent.role,
metadata={
"observation": self.task.description,
},
)
self.crew._short_term_memory.save(memory)
def _create_long_term_memory(self, output) -> None:
if self.crew and self.crew.memory:
ltm_agent = TaskEvaluator(self.crew_agent)
evaluation = ltm_agent.evaluate(self.task, output.log)
if isinstance(evaluation, ConverterError):
return
long_term_memory = LongTermMemoryItem(
task=self.task.description,
agent=self.crew_agent.role,
quality=evaluation.quality,
datetime=str(time.time()),
expected_output=self.task.expected_output,
metadata={
"suggestions": evaluation.suggestions,
"quality": evaluation.quality,
},
)
self.crew._long_term_memory.save(long_term_memory)
for entity in evaluation.entities:
entity_memory = EntityMemoryItem(
name=entity.name,
type=entity.type,
description=entity.description,
relationships="\n".join([f"- {r}" for r in entity.relationships]),
)
self.crew._entity_memory.save(entity_memory)
def _ask_human_input(self, final_answer: dict) -> str:
"""Get human input."""
return input(
self._i18n.slice("getting_input").format(final_answer=final_answer)
)

View File

@@ -0,0 +1,81 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Union
from pydantic import BaseModel, Field
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
from crewai.utilities import I18N
class BaseAgentTools(BaseModel, ABC):
"""Default tools around agent delegation"""
agents: List[BaseAgent] = Field(description="List of agents in this crew.")
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
@abstractmethod
def tools(self):
pass
def _get_coworker(self, coworker: Optional[str], **kwargs) -> Optional[str]:
coworker = coworker or kwargs.get("co_worker") or kwargs.get("coworker")
if coworker:
is_list = coworker.startswith("[") and coworker.endswith("]")
if is_list:
coworker = coworker[1:-1].split(",")[0]
return coworker
def delegate_work(
self, task: str, context: str, coworker: Optional[str] = None, **kwargs
):
"""Useful to delegate a specific task to a coworker passing all necessary context and names."""
coworker = self._get_coworker(coworker, **kwargs)
return self._execute(coworker, task, context)
def ask_question(
self, question: str, context: str, coworker: Optional[str] = None, **kwargs
):
"""Useful to ask a question, opinion or take from a coworker passing all necessary context and names."""
coworker = self._get_coworker(coworker, **kwargs)
return self._execute(coworker, question, context)
def _execute(self, agent: Union[str, None], task: str, context: Union[str, None]):
"""Execute the command."""
try:
if agent is None:
agent = ""
# It is important to remove the quotes from the agent name.
# The reason we have to do this is because less-powerful LLM's
# have difficulty producing valid JSON.
# As a result, we end up with invalid JSON that is truncated like this:
# {"task": "....", "coworker": "....
# when it should look like this:
# {"task": "....", "coworker": "...."}
agent_name = agent.casefold().replace('"', "").replace("\n", "")
agent = [
available_agent
for available_agent in self.agents
if available_agent.role.casefold().replace("\n", "") == agent_name
]
except Exception as _:
return self.i18n.errors("agent_tool_unexsiting_coworker").format(
coworkers="\n".join(
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
if not agent:
return self.i18n.errors("agent_tool_unexsiting_coworker").format(
coworkers="\n".join(
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
agent = agent[0]
task = Task(
description=task,
agent=agent,
expected_output="Your best answer to your coworker asking you this, accounting for the context shared.",
)
return agent.execute_task(task, context)

View File

@@ -0,0 +1,48 @@
from abc import ABC, abstractmethod
from typing import Any, Optional
from pydantic import BaseModel, Field, PrivateAttr
class OutputConverter(BaseModel, ABC):
"""
Abstract base class for converting task results into structured formats.
This class provides a framework for converting unstructured text into
either Pydantic models or JSON, tailored for specific agent requirements.
It uses a language model to interpret and structure the input text based
on given instructions.
Attributes:
text (str): The input text to be converted.
llm (Any): The language model used for conversion.
model (Any): The target model for structuring the output.
instructions (str): Specific instructions for the conversion process.
max_attempts (int): Maximum number of conversion attempts (default: 3).
"""
_is_gpt: bool = PrivateAttr(default=True)
text: str = Field(description="Text to be converted.")
llm: Any = Field(description="The language model to be used to convert the text.")
model: Any = Field(description="The model to be used to convert the text.")
instructions: str = Field(description="Conversion instructions to the LLM.")
max_attemps: Optional[int] = Field(
description="Max number of attemps to try to get the output formated.",
default=3,
)
@abstractmethod
def to_pydantic(self, current_attempt=1):
"""Convert text to pydantic."""
pass
@abstractmethod
def to_json(self, current_attempt=1):
"""Convert text to json."""
pass
@abstractmethod
def _is_gpt(self, llm):
"""Return if llm provided is of gpt from openai."""
pass

View File

@@ -0,0 +1,27 @@
from typing import Any, Dict
class TokenProcess:
total_tokens: int = 0
prompt_tokens: int = 0
completion_tokens: int = 0
successful_requests: int = 0
def sum_prompt_tokens(self, tokens: int):
self.prompt_tokens = self.prompt_tokens + tokens
self.total_tokens = self.total_tokens + tokens
def sum_completion_tokens(self, tokens: int):
self.completion_tokens = self.completion_tokens + tokens
self.total_tokens = self.total_tokens + tokens
def sum_successful_requests(self, requests: int):
self.successful_requests = self.successful_requests + requests
def get_summary(self) -> Dict[str, Any]:
return {
"total_tokens": self.total_tokens,
"prompt_tokens": self.prompt_tokens,
"completion_tokens": self.completion_tokens,
"successful_requests": self.successful_requests,
}

View File

@@ -7,22 +7,20 @@ from langchain.agents.agent import ExceptionTool
from langchain.callbacks.manager import CallbackManagerForChainRun
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain_core.exceptions import OutputParserException
from langchain_core.pydantic_v1 import root_validator
from langchain_core.tools import BaseTool
from langchain_core.utils.input import get_color_mapping
from pydantic import InstanceOf
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.tools_handler import ToolsHandler
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.training_handler import CrewTrainingHandler
class CrewAgentExecutor(AgentExecutor):
class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
_i18n: I18N = I18N()
should_ask_for_human_input: bool = False
llm: Any = None
@@ -44,61 +42,6 @@ class CrewAgentExecutor(AgentExecutor):
prompt_template: Optional[str] = None
response_template: Optional[str] = None
@root_validator()
def set_force_answer_max_iterations(cls, values: Dict) -> Dict:
values["force_answer_max_iterations"] = values["max_iterations"] - 2
return values
def _should_force_answer(self) -> bool:
return (
self.iterations == self.force_answer_max_iterations
) and not self.have_forced_answer
def _create_short_term_memory(self, output) -> None:
if (
self.crew
and self.crew.memory
and "Action: Delegate work to coworker" not in output.log
):
memory = ShortTermMemoryItem(
data=output.log,
agent=self.crew_agent.role,
metadata={
"observation": self.task.description,
},
)
self.crew._short_term_memory.save(memory)
def _create_long_term_memory(self, output) -> None:
if self.crew and self.crew.memory:
ltm_agent = TaskEvaluator(self.crew_agent)
evaluation = ltm_agent.evaluate(self.task, output.log)
if isinstance(evaluation, ConverterError):
return
long_term_memory = LongTermMemoryItem(
task=self.task.description,
agent=self.crew_agent.role,
quality=evaluation.quality,
datetime=str(time.time()),
expected_output=self.task.expected_output,
metadata={
"suggestions": evaluation.suggestions,
"quality": evaluation.quality,
},
)
self.crew._long_term_memory.save(long_term_memory)
for entity in evaluation.entities:
entity_memory = EntityMemoryItem(
name=entity.name,
type=entity.type,
description=entity.description,
relationships="\n".join([f"- {r}" for r in entity.relationships]),
)
self.crew._entity_memory.save(entity_memory)
def _call(
self,
inputs: Dict[str, str],
@@ -246,12 +189,17 @@ class CrewAgentExecutor(AgentExecutor):
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
if self.should_ask_for_human_input:
human_feedback = self._ask_human_input(output.return_values["output"])
if self.crew and self.crew._train:
self._handle_crew_training_output(output, human_feedback)
# Making sure we only ask for it once, so disabling for the next thought loop
self.should_ask_for_human_input = False
human_feedback = self._ask_human_input(output.return_values["output"])
action = AgentAction(
tool="Human Input", tool_input=human_feedback, log=output.log
)
yield AgentStep(
action=action,
observation=self._i18n.slice("human_feedback").format(
@@ -261,6 +209,9 @@ class CrewAgentExecutor(AgentExecutor):
return
else:
if self.crew and self.crew._train:
self._handle_crew_training_output(output)
yield output
return
@@ -300,8 +251,30 @@ class CrewAgentExecutor(AgentExecutor):
)
yield AgentStep(action=agent_action, observation=observation)
def _ask_human_input(self, final_answer: dict) -> str:
"""Get human input."""
return input(
self._i18n.slice("getting_input").format(final_answer=final_answer)
)
def _handle_crew_training_output(
self, output: AgentFinish, human_feedback: str | None = None
) -> None:
"""Function to handle the process of the training data."""
agent_id = str(self.crew_agent.id)
if (
CrewTrainingHandler(TRAINING_DATA_FILE).load()
and not self.should_ask_for_human_input
):
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
if training_data.get(agent_id):
training_data[agent_id][self.crew._train_iteration][
"improved_output"
] = output.return_values["output"]
CrewTrainingHandler(TRAINING_DATA_FILE).save(training_data)
if self.should_ask_for_human_input and human_feedback is not None:
training_data = {
"initial_output": output.return_values["output"],
"human_feedback": human_feedback,
"agent": agent_id,
"agent_role": self.crew_agent.role,
}
CrewTrainingHandler(TRAINING_DATA_FILE).append(
self.crew._train_iteration, agent_id, training_data
)

View File

@@ -15,8 +15,9 @@ def train():
"""
Train the crew for a given number of iterations.
"""
inputs = {"topic": "AI LLMs"}
try:
{{crew_name}}Crew().crew().train(n_iterations=int(sys.argv[1]))
{{crew_name}}Crew().crew().train(n_iterations=int(sys.argv[1]), inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}")

View File

@@ -6,7 +6,7 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = "^0.32.2" }
crewai = { extras = ["tools"], version = "^0.35.7" }
[tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:run"

View File

@@ -18,6 +18,7 @@ from pydantic import (
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache import CacheHandler
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
@@ -27,6 +28,8 @@ from crewai.task import Task
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.training_handler import CrewTrainingHandler
class Crew(BaseModel):
@@ -63,11 +66,13 @@ class Crew(BaseModel):
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
_train: Optional[bool] = PrivateAttr(default=False)
_train_iteration: Optional[int] = PrivateAttr()
cache: bool = Field(default=True)
model_config = ConfigDict(arbitrary_types_allowed=True)
tasks: List[Task] = Field(default_factory=list)
agents: List[Agent] = Field(default_factory=list)
agents: List[BaseAgent] = Field(default_factory=list)
process: Process = Field(default=Process.sequential)
verbose: Union[int, bool] = Field(default=0)
memory: bool = Field(
@@ -89,7 +94,7 @@ class Crew(BaseModel):
manager_llm: Optional[Any] = Field(
description="Language model that will run the agent.", default=None
)
manager_agent: Optional[Any] = Field(
manager_agent: Optional[BaseAgent] = Field(
description="Custom agent that will be used as manager.", default=None
)
manager_callbacks: Optional[List[InstanceOf[BaseCallbackHandler]]] = Field(
@@ -242,12 +247,41 @@ class Crew(BaseModel):
del task_config["agent"]
return Task(**task_config, agent=task_agent)
def _setup_for_training(self) -> None:
"""Sets up the crew for training."""
self._train = True
for task in self.tasks:
task.human_input = True
for agent in self.agents:
agent.allow_delegation = False
def train(self, n_iterations: int, inputs: Optional[Dict[str, Any]] = {}) -> None:
"""Trains the crew for a given number of iterations."""
self._setup_for_training()
for n_iteration in range(n_iterations):
self._train_iteration = n_iteration
self.kickoff(inputs=inputs)
training_data = CrewTrainingHandler("training_data.pkl").load()
for agent in self.agents:
result = TaskEvaluator(agent).evaluate_training_data(
training_data=training_data, agent_id=str(agent.id)
)
CrewTrainingHandler("trained_agents_data.pkl").save_trained_data(
agent_id=str(agent.role), trained_data=result.model_dump()
)
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = {},
) -> Union[str, Dict[str, Any]]:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self)
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
self._interpolate_inputs(inputs)
self._set_tasks_callbacks()
@@ -255,12 +289,21 @@ class Crew(BaseModel):
i18n = I18N(prompt_file=self.prompt_file)
for agent in self.agents:
# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
agent.i18n = i18n
agent.crew = self
if not agent.function_calling_llm:
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
agent.crew = self # type: ignore[attr-defined]
# TODO: Create an AgentFunctionCalling protocol for future refactoring
if (
hasattr(agent, "function_calling_llm")
and not agent.function_calling_llm
):
agent.function_calling_llm = self.function_calling_llm
if not agent.step_callback:
if hasattr(agent, "allow_code_execution") and agent.allow_code_execution:
agent.tools += agent.get_code_execution_tools()
if hasattr(agent, "step_callback") and not agent.step_callback:
agent.step_callback = self.step_callback
agent.create_agent_executor()
@@ -278,10 +321,10 @@ class Crew(BaseModel):
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
metrics = metrics + [
agent._token_process.get_summary() for agent in self.agents
]
self.usage_metrics = {
key: sum([m[key] for m in metrics if m is not None]) for key in metrics[0]
}
@@ -328,20 +371,17 @@ class Crew(BaseModel):
return results
def train(self, n_iterations: int) -> None:
# TODO: Implement training
pass
def _run_sequential_process(self) -> Union[str, Dict[str, Any]]:
def _run_sequential_process(self) -> str:
"""Executes tasks sequentially and returns the final output."""
task_output = ""
token_usage = []
for task in self.tasks:
if task.agent.allow_delegation: # type: ignore # Item "None" of "Agent | None" has no attribute "allow_delegation"
agents_for_delegation = [
agent for agent in self.agents if agent != task.agent
]
if len(self.agents) > 1 and len(agents_for_delegation) > 0:
task.tools += AgentTools(agents=agents_for_delegation).tools()
task.tools += task.agent.get_delegation_tools(agents_for_delegation)
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple")
@@ -353,7 +393,6 @@ class Crew(BaseModel):
self._file_handler.log(
agent=role, task=task.description, status="started"
)
output = task.execute(context=task_output)
if not task.async_execution:
@@ -361,15 +400,18 @@ class Crew(BaseModel):
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n")
token_summ = task.agent._token_process.get_summary()
token_usage.append(token_summ)
if self.output_log_file:
self._file_handler.log(agent=role, task=task_output, status="completed")
token_usage_formatted = self.aggregate_token_usage(token_usage)
self._finish_execution(task_output)
# type: ignore # Item "None" of "Agent | None" has no attribute "_token_process"
token_usage = task.agent._token_process.get_summary()
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
return self._format_output(task_output, token_usage)
return self._format_output(task_output, token_usage_formatted)
def _run_hierarchical_process(self) -> Union[str, Dict[str, Any]]:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
@@ -380,7 +422,7 @@ class Crew(BaseModel):
manager = self.manager_agent
if len(manager.tools) > 0:
raise Exception("Manager agent should not have tools")
manager.tools = AgentTools(agents=self.agents).tools()
manager.tools = self.manager_agent.get_delegation_tools(self.agents)
else:
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
@@ -388,10 +430,11 @@ class Crew(BaseModel):
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
tools=AgentTools(agents=self.agents).tools(),
llm=self.manager_llm,
verbose=True,
verbose=self.verbose,
)
task_output = ""
token_usage = []
for task in self.tasks:
self._logger.log("debug", f"Working Agent: {manager.role}")
self._logger.log("info", f"Starting Task: {task.description}")
@@ -406,17 +449,23 @@ class Crew(BaseModel):
)
self._logger.log("debug", f"[{manager.role}] Task output: {task_output}")
if hasattr(task, "agent._token_process"):
token_summ = task.agent._token_process.get_summary()
token_usage.append(token_summ)
if self.output_log_file:
self._file_handler.log(
agent=manager.role, task=task_output, status="completed"
)
self._finish_execution(task_output)
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
manager_token_usage = manager._token_process.get_summary()
token_usage.append(manager_token_usage)
token_usage_formatted = self.aggregate_token_usage(token_usage)
return self._format_output(
task_output, manager_token_usage
task_output, token_usage_formatted
), manager_token_usage
def copy(self):
@@ -465,10 +514,11 @@ class Crew(BaseModel):
for task in self.tasks
]
# type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None)
[agent.interpolate_inputs(inputs) for agent in self.agents]
for agent in self.agents:
agent.interpolate_inputs(inputs)
def _format_output(
self, output: str, token_usage: Optional[Dict[str, Any]]
self, output: str, token_usage: Optional[Dict[str, Any]] = None
) -> Union[str, Dict[str, Any]]:
"""
Formats the output of the crew execution.
@@ -490,3 +540,9 @@ class Crew(BaseModel):
def __repr__(self):
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"
def aggregate_token_usage(self, token_usage_list: List[Dict[str, Any]]):
return {
key: sum([m[key] for m in token_usage_list if m is not None])
for key in token_usage_list[0]
}

View File

@@ -1,17 +1,19 @@
from copy import deepcopy
import os
import re
import threading
import uuid
from copy import deepcopy
from typing import Any, Dict, List, Optional, Type
from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tasks.task_output import TaskOutput
from crewai.utilities import I18N, Converter, ConverterError, Printer
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import I18N, ConverterError, Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
@@ -42,7 +44,6 @@ class Task(BaseModel):
tools_errors: int = 0
delegations: int = 0
i18n: I18N = I18N()
thread: Optional[threading.Thread] = None
prompt_context: Optional[str] = None
description: str = Field(description="Description of the actual task.")
expected_output: str = Field(
@@ -55,7 +56,7 @@ class Task(BaseModel):
callback: Optional[Any] = Field(
description="Callback to be executed after the task is completed.", default=None
)
agent: Optional[Agent] = Field(
agent: Optional[BaseAgent] = Field(
description="Agent responsible for execution the task.", default=None
)
context: Optional[List["Task"]] = Field(
@@ -95,8 +96,11 @@ class Task(BaseModel):
default=False,
)
_telemetry: Telemetry
_execution_span: Span | None = None
_original_description: str | None = None
_original_expected_output: str | None = None
_thread: threading.Thread | None = None
def __init__(__pydantic_self__, **data):
config = data.pop("config", {})
@@ -118,6 +122,12 @@ class Task(BaseModel):
return value[1:]
return value
@model_validator(mode="after")
def set_private_attrs(self) -> "Task":
"""Set private attributes."""
self._telemetry = Telemetry()
return self
@model_validator(mode="after")
def set_attributes_based_on_config(self) -> "Task":
"""Set attributes based on the agent configuration."""
@@ -145,9 +155,21 @@ class Task(BaseModel):
)
return self
def wait_for_completion(self) -> str | BaseModel:
"""Wait for asynchronous task completion and return the output."""
assert self.async_execution, "Task is not set to be executed asynchronously."
if self._thread:
self._thread.join()
self._thread = None
assert self.output, "Task output is not set."
return self.output.exported_output
def execute( # type: ignore # Missing return statement
self,
agent: Agent | None = None,
agent: BaseAgent | None = None,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
@@ -157,6 +179,8 @@ class Task(BaseModel):
Output of the task.
"""
self._execution_span = self._telemetry.task_started(self)
agent = agent or self.agent
if not agent:
raise Exception(
@@ -168,8 +192,8 @@ class Task(BaseModel):
context = []
for task in self.context:
if task.async_execution:
task.thread.join() # type: ignore # Item "None" of "Thread | None" has no attribute "join"
if task and task.output:
task.wait_for_completion()
if task.output:
# type: ignore # Item "str" of "str | None" has no attribute "append"
context.append(task.output.raw_output)
# type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]"
@@ -179,10 +203,10 @@ class Task(BaseModel):
tools = tools or self.tools
if self.async_execution:
self.thread = threading.Thread(
self._thread = threading.Thread(
target=self._execute, args=(agent, self, context, tools)
)
self.thread.start()
self._thread.start()
else:
result = self._execute(
task=self,
@@ -198,9 +222,9 @@ class Task(BaseModel):
context=context,
tools=tools,
)
exported_output = self._export_output(result)
# type: ignore # the responses are usually str but need to figure out a more elegant solution here
self.output = TaskOutput(
description=self.description,
exported_output=exported_output,
@@ -211,6 +235,10 @@ class Task(BaseModel):
if self.callback:
self.callback(self.output)
if self._execution_span:
self._telemetry.task_ended(self._execution_span, self)
self._execution_span = None
return exported_output
def prompt(self) -> str:
@@ -262,7 +290,7 @@ class Task(BaseModel):
[task.copy() for task in self.context] if self.context else None
)
cloned_agent = self.agent.copy() if self.agent else None
cloned_tools = deepcopy(self.tools) if self.tools else None
cloned_tools = deepcopy(self.tools) if self.tools else []
copied_task = Task(
**copied_data,
@@ -302,14 +330,13 @@ class Task(BaseModel):
pass
# type: ignore # Item "None" of "Agent | None" has no attribute "function_calling_llm"
llm = self.agent.function_calling_llm or self.agent.llm
llm = getattr(self.agent, "function_calling_llm", None) or self.agent.llm
if not self._is_gpt(llm):
# type: ignore # Argument "model" to "PydanticSchemaParser" has incompatible type "type[BaseModel] | None"; expected "type[BaseModel]"
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
converter = Converter(
converter = self.agent.get_output_converter(
llm=llm, text=result, model=model, instructions=instructions
)

View File

@@ -1,8 +1,10 @@
from __future__ import annotations
import asyncio
import json
import os
import platform
from typing import Any
from typing import TYPE_CHECKING, Any
import pkg_resources
from opentelemetry import trace
@@ -10,7 +12,11 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExport
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace import Span, Status, StatusCode
if TYPE_CHECKING:
from crewai.crew import Crew
from crewai.task import Task
class Telemetry:
@@ -88,9 +94,6 @@ class Telemetry:
self._add_attribute(span, "python_version", platform.python_version())
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "crew_process", crew.process)
self._add_attribute(
span, "crew_language", crew.prompt_file if crew.i18n else "None"
)
self._add_attribute(span, "crew_memory", crew.memory)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
self._add_attribute(span, "crew_number_of_agents", len(crew.agents))
@@ -102,6 +105,8 @@ class Telemetry:
{
"id": str(agent.id),
"role": agent.role,
"goal": agent.goal,
"backstory": agent.backstory,
"verbose?": agent.verbose,
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
@@ -123,8 +128,16 @@ class Telemetry:
[
{
"id": str(task.id),
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None",
"context": (
[task.description for task in task.context]
if task.context
else None
),
"tools_names": [
tool.name.casefold() for tool in task.tools
],
@@ -143,6 +156,38 @@ class Telemetry:
except Exception:
pass
def task_started(self, task: Task) -> Span | None:
"""Records task started in a crew."""
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Task Execution")
self._add_attribute(span, "task_id", str(task.id))
self._add_attribute(span, "formatted_description", task.description)
self._add_attribute(
span, "formatted_expected_output", task.expected_output
)
return span
except Exception:
pass
return None
def task_ended(self, span: Span, task: Task):
"""Records task execution in a crew."""
if self.ready:
try:
self._add_attribute(
span, "output", task.output.raw_output if task.output else ""
)
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int):
"""Records the repeated usage 'error' of a tool by an agent."""
if self.ready:
@@ -207,7 +252,7 @@ class Telemetry:
except Exception:
pass
def crew_execution_span(self, crew):
def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None):
"""Records the complete execution of a crew.
This is only collected if the user has opted-in to share the crew.
"""
@@ -221,6 +266,7 @@ class Telemetry:
pkg_resources.get_distribution("crewai").version,
)
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "inputs", json.dumps(inputs))
self._add_attribute(
span,
"crew_agents",
@@ -238,7 +284,7 @@ class Telemetry:
"llm": json.dumps(self._safe_llm_attributes(agent.llm)),
"delegation_enabled?": agent.allow_delegation,
"tools_names": [
tool.name.casefold() for tool in agent.tools
tool.name.casefold() for tool in agent.tools or []
],
}
for agent in crew.agents
@@ -253,16 +299,17 @@ class Telemetry:
{
"id": str(task.id),
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
"output": task.expected_output,
"human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None",
"context": (
[task.description for task in task.context]
if task.context
else "None"
else None
),
"tools_names": [
tool.name.casefold() for tool in task.tools
tool.name.casefold() for tool in task.tools or []
],
}
for task in crew.tasks

View File

@@ -1,106 +1,25 @@
from typing import List, Union
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.task import Task
from crewai.utilities import I18N
from crewai.agents.agent_builder.utilities.base_agent_tool import BaseAgentTools
class AgentTools(BaseModel):
class AgentTools(BaseAgentTools):
"""Default tools around agent delegation"""
agents: List[Agent] = Field(description="List of agents in this crew.")
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
def tools(self):
coworkers = f"[{', '.join([f'{agent.role}' for agent in self.agents])}]"
tools = [
StructuredTool.from_function(
func=self.delegate_work,
name="Delegate work to coworker",
description=self.i18n.tools("delegate_work").format(
coworkers=f"[{', '.join([f'{agent.role}' for agent in self.agents])}]"
coworkers=coworkers
),
),
StructuredTool.from_function(
func=self.ask_question,
name="Ask question to coworker",
description=self.i18n.tools("ask_question").format(
coworkers=f"[{', '.join([f'{agent.role}' for agent in self.agents])}]"
),
description=self.i18n.tools("ask_question").format(coworkers=coworkers),
),
]
return tools
def delegate_work(
self,
task: str,
context: Union[str, None] = None,
coworker: Union[str, None] = None,
**kwargs,
):
"""Useful to delegate a specific task to a coworker passing all necessary context and names."""
coworker = coworker or kwargs.get("co_worker") or kwargs.get("coworker")
if coworker:
is_list = coworker.startswith("[") and coworker.endswith("]")
if is_list:
coworker = coworker[1:-1].split(",")[0]
return self._execute(coworker, task, context)
def ask_question(
self,
question: str,
context: Union[str, None] = None,
coworker: Union[str, None] = None,
**kwargs,
):
"""Useful to ask a question, opinion or take from a coworker passing all necessary context and names."""
coworker = coworker or kwargs.get("co_worker") or kwargs.get("coworker")
if coworker:
is_list = coworker.startswith("[") and coworker.endswith("]")
if is_list:
coworker = coworker[1:-1].split(",")[0]
return self._execute(coworker, question, context)
def _execute(self, agent: Union[str, None], task: str, context: Union[str, None]):
"""Execute the command."""
try:
if agent is None:
agent = ""
# It is important to remove the quotes from the agent name.
# The reason we have to do this is because less-powerful LLM's
# have difficulty producing valid JSON.
# As a result, we end up with invalid JSON that is truncated like this:
# {"task": "....", "coworker": "....
# when it should look like this:
# {"task": "....", "coworker": "...."}
agent_name = agent.casefold().replace('"', "").replace("\n", "")
agent = [
available_agent
for available_agent in self.agents
if available_agent.role.casefold().replace("\n", "") == agent_name
]
except Exception as _:
return self.i18n.errors("agent_tool_unexsiting_coworker").format(
coworkers="\n".join(
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
if not agent:
return self.i18n.errors("agent_tool_unexsiting_coworker").format(
coworkers="\n".join(
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
agent = agent[0]
task = Task(
description=task,
agent=agent,
expected_output="Your best answer to your coworker asking you this, accounting for the context shared.",
)
return agent.execute_task(task, context)

View File

@@ -98,7 +98,7 @@ class ToolUsage:
tool_string: str,
tool: BaseTool,
calling: Union[ToolCalling, InstructorToolCalling],
) -> None: # TODO: Fix this return type
) -> str: # TODO: Fix this return type --> finecwg : I updated return type to str
if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
try:
result = self._i18n.errors("task_repeated_usage").format(
@@ -123,7 +123,7 @@ class ToolUsage:
tool=calling.tool_name, input=calling.arguments
)
if not result:
if result is None: #! finecwg: if not result --> if result is None
try:
if calling.tool_name in [
"Delegate work to coworker",

View File

@@ -1,9 +1,22 @@
from .converter import Converter, ConverterError
from .file_handler import FileHandler
from .i18n import I18N
from .instructor import Instructor
from .logger import Logger
from .parser import YamlParser
from .printer import Printer
from .prompts import Prompts
from .rpm_controller import RPMController
from .fileHandler import FileHandler
from .parser import YamlParser
__all__ = [
"Converter",
"ConverterError",
"FileHandler",
"I18N",
"Instructor",
"Logger",
"Printer",
"Prompts",
"RPMController",
"YamlParser",
]

View File

@@ -0,0 +1,2 @@
TRAINING_DATA_FILE = "training_data.pkl"
TRAINED_AGENTS_DATA_FILE = "trained_agents_data.pkl"

View File

@@ -1,9 +1,11 @@
import json
from typing import Any, Optional
from langchain.schema import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field, PrivateAttr, model_validator
from pydantic import model_validator
from crewai.agents.agent_builder.utilities.base_output_converter_base import (
OutputConverter,
)
class ConverterError(Exception):
@@ -14,19 +16,9 @@ class ConverterError(Exception):
self.message = message
class Converter(BaseModel):
class Converter(OutputConverter):
"""Class that converts text into either pydantic or json."""
_is_gpt: bool = PrivateAttr(default=True)
text: str = Field(description="Text to be converted.")
llm: Any = Field(description="The language model to be used to convert the text.")
model: Any = Field(description="The model to be used to convert the text.")
instructions: str = Field(description="Conversion instructions to the LLM.")
max_attemps: Optional[int] = Field(
description="Max number of attemps to try to get the output formated.",
default=3,
)
@model_validator(mode="after")
def check_llm_provider(self):
if not self._is_gpt(self.llm):

View File

@@ -26,6 +26,18 @@ class TaskEvaluation(BaseModel):
)
class TrainingTaskEvaluation(BaseModel):
suggestions: List[str] = Field(
description="Based on the Human Feedbacks and the comparison between Initial Outputs and Improved outputs provide action items based on human_feedback for future tasks."
)
quality: float = Field(
description="A score from 0 to 10 evaluating on completion, quality, and overall performance from the improved output to the initial output based on the human feedback."
)
final_summary: str = Field(
description="A step by step action items to improve the next Agent based on the human-feedback and improved output."
)
class TaskEvaluator:
def __init__(self, original_agent):
self.llm = original_agent.llm
@@ -59,3 +71,49 @@ class TaskEvaluator:
def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
def evaluate_training_data(
self, training_data: dict, agent_id: str
) -> TrainingTaskEvaluation:
"""
Evaluate the training data based on the llm output, human feedback, and improved output.
Parameters:
- training_data (dict): The training data to be evaluated.
- agent_id (str): The ID of the agent.
"""
output_training_data = training_data[agent_id]
final_aggregated_data = ""
for _, data in output_training_data.items():
final_aggregated_data += (
f"Initial Output:\n{data['initial_output']}\n\n"
f"Human Feedback:\n{data['human_feedback']}\n\n"
f"Improved Output:\n{data['improved_output']}\n\n"
)
evaluation_query = (
"Assess the quality of the training data based on the llm output, human feedback , and llm output improved result.\n\n"
f"{final_aggregated_data}"
"Please provide:\n"
"- Based on the Human Feedbacks and the comparison between Initial Outputs and Improved outputs provide action items based on human_feedback for future tasks\n"
"- A score from 0 to 10 evaluating on completion, quality, and overall performance from the improved output to the initial output based on the human feedback\n"
)
instructions = "I'm gonna convert this raw text into valid JSON."
if not self._is_gpt(self.llm):
model_schema = PydanticSchemaParser(
model=TrainingTaskEvaluation
).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
converter = Converter(
llm=self.llm,
text=evaluation_query,
model=TrainingTaskEvaluation,
instructions=instructions,
)
pydantic_result = converter.to_pydantic()
return pydantic_result

View File

@@ -1,20 +0,0 @@
import os
from datetime import datetime
class FileHandler:
"""take care of file operations, currently it only logs messages to a file"""
def __init__(self, file_path):
if isinstance(file_path, bool):
self._path = os.path.join(os.curdir, "logs.txt")
elif isinstance(file_path, str):
self._path = file_path
else:
raise ValueError("file_path must be either a boolean or a string.")
def log(self, **kwargs):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"{now}: ".join([f"{key}={value}" for key, value in kwargs.items()])
with open(self._path, "a", encoding = 'utf-8') as file:
file.write(message + "\n")

View File

@@ -0,0 +1,69 @@
import os
import pickle
from datetime import datetime
class FileHandler:
"""take care of file operations, currently it only logs messages to a file"""
def __init__(self, file_path):
if isinstance(file_path, bool):
self._path = os.path.join(os.curdir, "logs.txt")
elif isinstance(file_path, str):
self._path = file_path
else:
raise ValueError("file_path must be either a boolean or a string.")
def log(self, **kwargs):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"{now}: ".join([f"{key}={value}" for key, value in kwargs.items()])
with open(self._path, "a", encoding="utf-8") as file:
file.write(message + "\n")
class PickleHandler:
def __init__(self, file_name: str) -> None:
"""
Initialize the PickleHandler with the name of the file where data will be stored.
The file will be saved in the current directory.
Parameters:
- file_name (str): The name of the file for saving and loading data.
"""
self.file_path = os.path.join(os.getcwd(), file_name)
self._initialize_file()
def _initialize_file(self) -> None:
"""
Initialize the file with an empty dictionary if it does not exist or is empty.
"""
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
self.save({}) # Save an empty dictionary to initialize the file
def save(self, data) -> None:
"""
Save the data to the specified file using pickle.
Parameters:
- data (object): The data to be saved.
"""
with open(self.file_path, "wb") as file:
pickle.dump(data, file)
def load(self) -> dict:
"""
Load the data from the specified file using pickle.
Returns:
- dict: The data loaded from the file.
"""
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
return {} # Return an empty dictionary if the file does not exist or is empty
with open(self.file_path, "rb") as file:
try:
return pickle.load(file)
except EOFError:
return {} # Return an empty dictionary if the file is empty or corrupted
except Exception:
raise # Raise any other exceptions that occur during loading

View File

@@ -4,31 +4,7 @@ import tiktoken
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
class TokenProcess:
total_tokens: int = 0
prompt_tokens: int = 0
completion_tokens: int = 0
successful_requests: int = 0
def sum_prompt_tokens(self, tokens: int):
self.prompt_tokens = self.prompt_tokens + tokens
self.total_tokens = self.total_tokens + tokens
def sum_completion_tokens(self, tokens: int):
self.completion_tokens = self.completion_tokens + tokens
self.total_tokens = self.total_tokens + tokens
def sum_successful_requests(self, requests: int):
self.successful_requests = self.successful_requests + requests
def get_summary(self) -> Dict[str, Any]:
return {
"total_tokens": self.total_tokens,
"prompt_tokens": self.prompt_tokens,
"completion_tokens": self.completion_tokens,
"successful_requests": self.successful_requests,
}
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
class TokenCalcHandler(BaseCallbackHandler):

View File

@@ -0,0 +1,31 @@
from crewai.utilities.file_handler import PickleHandler
class CrewTrainingHandler(PickleHandler):
def save_trained_data(self, agent_id: str, trained_data: dict) -> None:
"""
Save the trained data for a specific agent.
Parameters:
- agent_id (str): The ID of the agent.
- trained_data (dict): The trained data to be saved.
"""
data = self.load()
data[agent_id] = trained_data
self.save(data)
def append(self, train_iteration: int, agent_id: str, new_data) -> None:
"""
Append new data to the existing pickle file.
Parameters:
- new_data (object): The new data to be appended.
"""
data = self.load()
if agent_id in data:
data[agent_id][train_iteration] = new_data
else:
data[agent_id] = {train_iteration: new_data}
self.save(data)

View File

@@ -1,5 +1,6 @@
"""Test Agent creation and execution basic functionality."""
from unittest import mock
from unittest.mock import patch
import pytest
@@ -11,6 +12,7 @@ from crewai import Agent, Crew, Task
from crewai.agents.cache import CacheHandler
from crewai.agents.executor import CrewAgentExecutor
from crewai.agents.parser import CrewAgentParser
from crewai.tools.tool_calling import InstructorToolCalling
from crewai.tools.tool_usage import ToolUsage
from crewai.utilities import RPMController
@@ -842,3 +844,54 @@ Thought:
"""
)
@patch("crewai.agent.CrewTrainingHandler")
def test_agent_training_handler(crew_training_handler):
task_prompt = "What is 1 + 1?"
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
verbose=True,
)
crew_training_handler().load.return_value = {
f"{str(agent.id)}": {"0": {"human_feedback": "good"}}
}
result = agent._training_handler(task_prompt=task_prompt)
assert result == "What is 1 + 1?You MUST follow these feedbacks: \n good"
crew_training_handler.assert_has_calls(
[mock.call(), mock.call("training_data.pkl"), mock.call().load()]
)
@patch("crewai.agent.CrewTrainingHandler")
def test_agent_use_trained_data(crew_training_handler):
task_prompt = "What is 1 + 1?"
agent = Agent(
role="researcher",
goal="test goal",
backstory="test backstory",
verbose=True,
)
crew_training_handler().load.return_value = {
agent.role: {
"suggestions": [
"The result of the math operatio must be right.",
"Result must be better than 1.",
]
}
}
result = agent._use_trained_data(task_prompt=task_prompt)
assert (
result == "What is 1 + 1?You MUST follow these feedbacks: \n "
"The result of the math operatio must be right.\n - Result must be better than 1."
)
crew_training_handler.assert_has_calls(
[mock.call(), mock.call("trained_agents_data.pkl"), mock.call().load()]
)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,8 @@
"""Test Agent creation and execution basic functionality."""
import json
from unittest import mock
from unittest.mock import patch
import pydantic_core
import pytest
@@ -155,9 +157,9 @@ def test_hierarchical_process():
manager_llm=ChatOpenAI(temperature=0, model="gpt-4"),
tasks=[task],
)
result = crew.kickoff()
assert (
crew.kickoff()
result
== "1. 'Demystifying AI: An in-depth exploration of Artificial Intelligence for the layperson' - In this piece, we will unravel the enigma of AI, simplifying its complexities into digestible information for the everyday individual. By using relatable examples and analogies, we will journey through the neural networks and machine learning algorithms that define AI, without the jargon and convoluted explanations that often accompany such topics.\n\n2. 'The Role of AI in Startups: A Game Changer?' - Startups today are harnessing the power of AI to revolutionize their businesses. This article will delve into how AI, as an innovative force, is shaping the startup ecosystem, transforming everything from customer service to product development. We'll explore real-life case studies of startups that have leveraged AI to accelerate their growth and disrupt their respective industries.\n\n3. 'AI and Ethics: Navigating the Complex Landscape' - AI brings with it not just technological advancements, but ethical dilemmas as well. This article will engage readers in a thought-provoking discussion on the ethical implications of AI, exploring issues like bias in algorithms, privacy concerns, job displacement, and the moral responsibility of AI developers. We will also discuss potential solutions and frameworks to address these challenges.\n\n4. 'Unveiling the AI Agents: The Future of Customer Service' - AI agents are poised to reshape the customer service landscape, offering businesses the ability to provide round-the-clock support and personalized experiences. In this article, we'll dive deep into the world of AI agents, examining how they work, their benefits and limitations, and how they're set to redefine customer interactions in the digital age.\n\n5. 'From Science Fiction to Reality: AI in Everyday Life' - AI, once a concept limited to the realm of sci-fi, has now permeated our daily lives. This article will highlight the ubiquitous presence of AI, from voice assistants and recommendation algorithms, to autonomous vehicles and smart homes. We'll explore how AI, in its various forms, is transforming our everyday experiences, making the future seem a lot closer than we imagined."
)
@@ -381,14 +383,15 @@ def test_crew_full_ouput():
crew = Crew(agents=[agent], tasks=[task1, task2], full_output=True)
result = crew.kickoff()
assert result == {
"final_output": "Hello! It is a delight to receive your message. I trust this response finds you in good spirits. It's indeed a pleasure to connect with you too.",
"final_output": "Hello!",
"tasks_outputs": [task1.output, task2.output],
"usage_metrics": {
"completion_tokens": 109,
"prompt_tokens": 330,
"successful_requests": 2,
"total_tokens": 439,
"total_tokens": 517,
"prompt_tokens": 466,
"completion_tokens": 51,
"successful_requests": 3,
},
}
@@ -597,6 +600,30 @@ def test_task_with_no_arguments():
assert result == "75"
def test_code_execution_flag_adds_code_tool_upon_kickoff():
from crewai_tools import CodeInterpreterTool
programmer = Agent(
role="Programmer",
goal="Write code to solve problems.",
backstory="You're a programmer who loves to solve problems with code.",
allow_delegation=False,
allow_code_execution=True,
)
task = Task(
description="How much is 2 + 2?",
expected_output="The result of the sum as an integer.",
agent=programmer,
)
crew = Crew(agents=[programmer], tasks=[task])
crew.kickoff()
assert len(programmer.tools) == 1
assert programmer.tools[0].__class__ == CodeInterpreterTool
@pytest.mark.vcr(filter_headers=["authorization"])
def test_delegation_is_not_enabled_if_there_are_only_one_agent():
from unittest.mock import patch
@@ -614,7 +641,6 @@ def test_delegation_is_not_enabled_if_there_are_only_one_agent():
)
crew = Crew(agents=[researcher], tasks=[task])
with patch.object(Task, "execute") as execute:
execute.return_value = "ok"
crew.kickoff()
@@ -682,15 +708,15 @@ def test_agent_usage_metrics_are_captured_for_hierarchical_process():
agents=[agent],
tasks=[task],
process=Process.hierarchical,
manager_llm=ChatOpenAI(temperature=0, model="gpt-4"),
manager_llm=ChatOpenAI(temperature=0, model="gpt-4o"),
)
result = crew.kickoff()
assert result == '"Howdy!"'
print(crew.usage_metrics)
assert crew.usage_metrics == {
"total_tokens": 1659,
"prompt_tokens": 1376,
"total_tokens": 1616,
"prompt_tokens": 1333,
"completion_tokens": 283,
"successful_requests": 3,
}
@@ -1006,7 +1032,10 @@ def test_manager_agent_with_tools_raises_exception():
crew.kickoff()
def test_crew_train_success():
@patch("crewai.crew.Crew.kickoff")
@patch("crewai.crew.CrewTrainingHandler")
@patch("crewai.crew.TaskEvaluator")
def test_crew_train_success(task_evaluator, crew_training_handler, kickoff):
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
@@ -1016,8 +1045,48 @@ def test_crew_train_success():
agents=[researcher, writer],
tasks=[task],
)
crew.train(n_iterations=2, inputs={"topic": "AI"})
task_evaluator.assert_has_calls(
[
mock.call(researcher),
mock.call().evaluate_training_data(
training_data=crew_training_handler().load(),
agent_id=str(researcher.id),
),
mock.call().evaluate_training_data().model_dump(),
mock.call(writer),
mock.call().evaluate_training_data(
training_data=crew_training_handler().load(),
agent_id=str(writer.id),
),
mock.call().evaluate_training_data().model_dump(),
]
)
crew.train(n_iterations=2)
crew_training_handler.assert_has_calls(
[
mock.call("training_data.pkl"),
mock.call().load(),
mock.call("trained_agents_data.pkl"),
mock.call().save_trained_data(
agent_id="Researcher",
trained_data=task_evaluator().evaluate_training_data().model_dump(),
),
mock.call("trained_agents_data.pkl"),
mock.call().save_trained_data(
agent_id="Senior Writer",
trained_data=task_evaluator().evaluate_training_data().model_dump(),
),
mock.call(),
mock.call().load(),
mock.call(),
mock.call().load(),
]
)
kickoff.assert_has_calls(
[mock.call(inputs={"topic": "AI"}), mock.call(inputs={"topic": "AI"})]
)
def test_crew_train_error():
@@ -1036,3 +1105,32 @@ def test_crew_train_error():
assert "train() missing 1 required positional argument: 'n_iterations'" in str(
e
)
def test__setup_for_training():
researcher.allow_delegation = True
writer.allow_delegation = True
agents = [researcher, writer]
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article",
expected_output="5 bullet points with a paragraph for each idea.",
)
crew = Crew(
agents=agents,
tasks=[task],
)
assert crew._train is False
assert task.human_input is False
for agent in agents:
assert agent.allow_delegation is True
crew._setup_for_training()
assert crew._train is True
assert task.human_input is True
for agent in agents:
assert agent.allow_delegation is False

View File

@@ -1,7 +1,6 @@
"""Test Agent creation and execution basic functionality."""
import json
from unittest.mock import MagicMock, patch
import pytest

View File

@@ -0,0 +1,64 @@
from unittest import mock
from unittest.mock import MagicMock, patch
from crewai.utilities.evaluators.task_evaluator import (
TaskEvaluator,
TrainingTaskEvaluation,
)
@patch("crewai.utilities.evaluators.task_evaluator.Converter")
def test_evaluate_training_data(converter_mock):
training_data = {
"agent_id": {
"data1": {
"initial_output": "Initial output 1",
"human_feedback": "Human feedback 1",
"improved_output": "Improved output 1",
},
"data2": {
"initial_output": "Initial output 2",
"human_feedback": "Human feedback 2",
"improved_output": "Improved output 2",
},
}
}
agent_id = "agent_id"
original_agent = MagicMock()
function_return_value = TrainingTaskEvaluation(
suggestions=[
"The initial output was already good, having a detailed explanation. However, the improved output "
"gave similar information but in a more professional manner using better vocabulary. For future tasks, "
"try to implement more elaborate language and precise terminology from the beginning."
],
quality=8.0,
final_summary="The agent responded well initially. However, the improved output showed that there is room "
"for enhancement in terms of language usage, precision, and professionalism. For future tasks, the agent "
"should focus more on these points from the start to increase performance.",
)
converter_mock.return_value.to_pydantic.return_value = function_return_value
result = TaskEvaluator(original_agent=original_agent).evaluate_training_data(
training_data, agent_id
)
assert result == function_return_value
converter_mock.assert_has_calls(
[
mock.call(
llm=original_agent.llm,
text="Assess the quality of the training data based on the llm output, human feedback , and llm "
"output improved result.\n\nInitial Output:\nInitial output 1\n\nHuman Feedback:\nHuman feedback "
"1\n\nImproved Output:\nImproved output 1\n\nInitial Output:\nInitial output 2\n\nHuman "
"Feedback:\nHuman feedback 2\n\nImproved Output:\nImproved output 2\n\nPlease provide:\n- "
"Based on the Human Feedbacks and the comparison between Initial Outputs and Improved outputs "
"provide action items based on human_feedback for future tasks\n- A score from 0 to 10 evaluating "
"on completion, quality, and overall performance from the improved output to the initial output "
"based on the human feedback\n",
model=TrainingTaskEvaluation,
instructions="I'm gonna convert this raw text into valid JSON.\n\nThe json should have the "
"following structure, with the following keys:\n- suggestions: List[str]\n- "
"quality: float\n- final_summary: str",
),
mock.call().to_pydantic(),
]
)

View File

@@ -0,0 +1,41 @@
import os
import unittest
import pytest
from crewai.utilities.file_handler import PickleHandler
class TestPickleHandler(unittest.TestCase):
def setUp(self):
self.file_name = "test_data.pkl"
self.file_path = os.path.join(os.getcwd(), self.file_name)
self.handler = PickleHandler(self.file_name)
def tearDown(self):
if os.path.exists(self.file_path):
os.remove(self.file_path)
def test_initialize_file(self):
assert os.path.exists(self.file_path) is True
assert os.path.getsize(self.file_path) >= 0
def test_save_and_load(self):
data = {"key": "value"}
self.handler.save(data)
loaded_data = self.handler.load()
assert loaded_data == data
def test_load_empty_file(self):
loaded_data = self.handler.load()
assert loaded_data == {}
def test_load_corrupted_file(self):
with open(self.file_path, "wb") as file:
file.write(b"corrupted data")
with pytest.raises(Exception) as exc:
self.handler.load()
assert str(exc.value) == "pickle data was truncated"
assert "<class '_pickle.UnpicklingError'>" == str(exc.type)

View File

@@ -0,0 +1,42 @@
import os
import unittest
from crewai.utilities.training_handler import CrewTrainingHandler
class TestCrewTrainingHandler(unittest.TestCase):
def setUp(self):
self.handler = CrewTrainingHandler("trained_data.pkl")
def tearDown(self):
os.remove("trained_data.pkl")
del self.handler
def test_save_trained_data(self):
agent_id = "agent1"
trained_data = {"param1": 1, "param2": 2}
self.handler.save_trained_data(agent_id, trained_data)
# Assert that the trained data is saved correctly
data = self.handler.load()
assert data[agent_id] == trained_data
def test_append_existing_agent(self):
train_iteration = 1
agent_id = "agent1"
new_data = {"param3": 3, "param4": 4}
self.handler.append(train_iteration, agent_id, new_data)
# Assert that the new data is appended correctly to the existing agent
data = self.handler.load()
assert data[agent_id][train_iteration] == new_data
def test_append_new_agent(self):
train_iteration = 1
agent_id = "agent2"
new_data = {"param5": 5, "param6": 6}
self.handler.append(train_iteration, agent_id, new_data)
# Assert that the new agent and data are appended correctly
data = self.handler.load()
assert data[agent_id][train_iteration] == new_data