Compare commits

...

27 Commits

Author SHA1 Message Date
Devin AI
db34516b18 fix: Fix type-checker issues with _finish_execution method
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:46:27 +00:00
Devin AI
6c3f4a6446 refactor: Improve agentops integration with better types and tests
- Add specific error handling for agentops initialization
- Add type hints for better code readability
- Improve API key validation
- Reorganize tests using pytest fixtures and classes
- Add documentation for set_private_attrs method

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:43:26 +00:00
Devin AI
3cf2f982dd fix: Add pytest.ini and improve test initialization
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:41:36 +00:00
Devin AI
48604f567b fix: Improve agentops initialization with better validation and logging
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:27:56 +00:00
Devin AI
f724ed93b9 fix: Sort imports to fix linting issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:27:21 +00:00
Devin AI
abc71be5f6 fix: Sort imports to fix linting issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:25:41 +00:00
Devin AI
420f826e56 fix: Initialize agentops in Crew setup (#2102)
- Add proper agentops initialization in Crew's set_private_attrs
- Add test coverage for agentops initialization
- Add test coverage for Gemini LLM with agentops

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:24:05 +00:00
Brandon Hancock (bhancock_ai)
47818f4f41 updating bedrock docs (#2088)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-10 12:48:12 -05:00
Brandon Hancock (bhancock_ai)
9b10fd47b0 incorporate Small update in memory.mdx, fixing Google AI parameters #2008 (#2087) 2025-02-10 12:17:41 -05:00
Brandon Hancock (bhancock_ai)
c408368267 fix linting issues in new tests (#2089)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-10 12:10:53 -05:00
Kevin King
90b3145e92 Updated excel_knowledge_source.py to account for excel files with multiple tabs. (#1921)
* Updated excel_knowledge_source.py to account for excel sheets that have multiple tabs. The old implementation contained a single df=pd.read_excel(excel_file_path), which only reads the first or most recently used excel sheet. The updated functionality reads all sheets in the excel workbook.

* updated load_content() function in excel_knowledge_source.py to reduce memory usage and provide better documentation

* accidentally didn't delete the old load_content() function in last commit - corrected this

* Added an override for the content field from the inheritted BaseFileKnowledgeSource to account for the change in the load_content method to support excel files with multiple tabs/sheets. This change should ensure it passes the type check test, as it failed before since content was assigned a different type in BaseFileKnowledgeSource

* Now removed the commented out imports in _import_dependencies, as requested

* Updated excel_knowledge_source to fix linter errors and type errors. Changed inheritence from basefileknowledgesource to baseknowledgesource because basefileknowledgesource's types conflicted (in particular the load_content function and the content class variable.

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-10 08:56:32 -08:00
Nicolas Lorin
fbd0e015d5 doc: use the corresponding source depending on filetype (#2038)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-09 20:25:33 -03:00
Bradley Goodyear
17e25fb842 Fix a typo in the Task Guardrails section (#2043)
Co-authored-by: João Moura <joaomdmoura@gmail.com>
2025-02-09 20:23:52 -03:00
devin-ai-integration[bot]
d6d98ee969 docs: fix long term memory class name in examples (#2049)
* docs: fix long term memory class name in examples

- Replace EnhanceLongTermMemory with LongTermMemory to match actual implementation
- Update code examples to show correct usage
- Fixes #2026

Co-Authored-By: Joe Moura <joao@crewai.com>

* docs: improve memory examples with imports, types and security

- Add proper import statements
- Add type hints for better readability
- Add descriptive comments for each memory type
- Add security considerations section
- Add configuration examples section
- Use environment variables for storage paths

Co-Authored-By: Joe Moura <joao@crewai.com>

* Update memory.mdx

* Update memory.mdx

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
Co-authored-by: João Moura <joaomdmoura@gmail.com>
2025-02-09 16:47:31 -03:00
devin-ai-integration[bot]
e0600e3bb9 fix: ensure proper message formatting for Anthropic models (#2063)
* fix: ensure proper message formatting for Anthropic models

- Add Anthropic-specific message formatting
- Add placeholder user message when required
- Add test case for Anthropic message formatting

Fixes #1869

Co-Authored-By: Joe Moura <joao@crewai.com>

* refactor: improve Anthropic model handling

- Add robust model detection with _is_anthropic_model
- Enhance message formatting with better edge cases
- Add type hints and improve documentation
- Improve test structure with fixtures
- Add edge case tests

Addresses review feedback on #2063

Co-Authored-By: Joe Moura <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
2025-02-09 16:35:52 -03:00
devin-ai-integration[bot]
a79d77dfd7 docs: document FileWriterTool as solution for file writing issues (#2039)
* docs: add FileWriterTool recommendation for file writing issues

- Add FileWriterTool recommendation in _save_file docstring
- Update error message to suggest using FileWriterTool for cross-platform compatibility
- Resolves #2015

Co-Authored-By: Joe Moura <joao@crewai.com>

* docs: enhance FileWriterTool documentation

- Add cross-platform compatibility details
- Highlight UTF-8 encoding support
- Emphasize Windows compatibility
- Add recommendation for users experiencing file writing issues

Part of #2015

Co-Authored-By: Joe Moura <joao@crewai.com>

* refactor: improve _save_file type hints and error messages

Co-Authored-By: Joe Moura <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
Co-authored-by: João Moura <joaomdmoura@gmail.com>
2025-02-09 16:21:56 -03:00
devin-ai-integration[bot]
56ec9bc224 fix: handle multiple task outputs correctly in conditional tasks (#1937)
* fix: handle multiple task outputs correctly in conditional tasks

- Fix IndexError in _handle_conditional_task by using first output
- Modify _execute_tasks to accumulate task outputs instead of resetting
- Update _create_crew_output to handle multiple outputs correctly
- Add tests for multiple tasks with conditional and multiple conditional tasks

Co-Authored-By: brandon@crewai.com <brandon@crewai.com>

* feat: validate at least one non-conditional task and refine task outputs

Co-Authored-By: brandon@crewai.com <brandon@crewai.com>

* Revert to single output in _create_crew_output; remove redundant empty task check

Co-Authored-By: brandon@crewai.com <brandon@crewai.com>

* Address PR feedback: use last output in conditional tasks, add validation test

Co-Authored-By: brandon@crewai.com <brandon@crewai.com>

* Address PR feedback: updated conditional tasks tests and indexing

Co-Authored-By: brandon@crewai.com <brandon@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: brandon@crewai.com <brandon@crewai.com>
Co-authored-by: Brandon Hancock <brandon@brandonhancock.io>
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
Co-authored-by: João Moura <joaomdmoura@gmail.com>
2025-02-09 16:20:16 -03:00
João Moura
8eef02739a adding shoutout to enterprise 2025-02-09 12:55:33 -05:00
Brandon Hancock (bhancock_ai)
6f4ad532e6 Brandon/general cleanup (#2059)
* clean up. fix type safety. address memory config docs

* improve manager

* Include fix for o1 models not supporting system messages

* more broad with o1

* address fix: Typo in expected_output string #2045

* drop prints

* drop prints

* wip

* wip

* fix failing memory tests

* Fix memory provider issue

* clean up short term memory

* revert ltm

* drop

* clean up linting issues

* more linting
2025-02-07 17:00:41 -05:00
Brandon Hancock (bhancock_ai)
74a1de8550 clean up google docs (#2061) 2025-02-07 16:58:13 -05:00
Lorenze Jay
e529766391 Enhance embedding configuration with custom embedder support (#2060)
* Enhance embedding configuration with custom embedder support

- Add support for custom embedding functions in EmbeddingConfigurator
- Update type hints for embedder configuration
- Extend configuration options for various embedding providers
- Add optional embedder configuration to Memory class

* added docs

* Refine custom embedder configuration support

- Update custom embedder configuration method to handle custom embedding functions
- Modify type hints for embedder configuration
- Remove unused model_name parameter in custom embedder configuration
2025-02-07 16:49:46 -05:00
Brandon Hancock (bhancock_ai)
a7f5d574dc General Clean UP (#2042)
* clean up. fix type safety. address memory config docs

* improve manager

* Include fix for o1 models not supporting system messages

* more broad with o1

* address fix: Typo in expected_output string #2045

* drop prints

* drop prints

* wip

* wip

* fix failing memory tests

* Fix memory provider issue

* clean up short term memory

* revert ltm

* drop
2025-02-07 14:45:36 -05:00
Vidit Ostwal
0cc02d9492 Added support for logging in JSON format as well. (#1985)
* Added functionality to have json format as well for the logs

* Added additional comments, refractored logging functionality

* Fixed documentation to include the new paramter

* Fixed typo

* Added a Pydantic Error Check between output_log_file and save_as_json parameter

* Removed the save_to_json parameter, incorporated the functionality directly with output_log_file

* Fixed typo

* Sorted the imports using isort

---------

Co-authored-by: Vidit Ostwal <vidit.ostwal@piramal.com>
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-07 13:16:44 -05:00
Vidit Ostwal
fa26f6ebae Added reset memories function inside crew class (#2047)
* Added reset memories function inside crew class

* Fixed typos

* Refractored the code

* Refactor memory reset functionality in Crew class

- Improved error handling and logging for memory reset operations
- Added private methods to modularize memory reset logic
- Enhanced type hints and docstrings
- Updated CLI reset memories command to use new Crew method
- Added utility function to get crew instance in CLI utils

* fix linting issues

* knowledge: Add null check in reset method for storage

* cli: Update memory reset tests to use Crew's reset_memories method

* cli: Enhance memory reset command with improved error handling and validation

---------

Co-authored-by: Lorenze Jay <lorenzejaytech@gmail.com>
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-07 09:49:25 -08:00
Brandon Hancock (bhancock_ai)
f6c2982619 fix manager (#2056) 2025-02-07 10:58:38 -05:00
hyjbrave
5a8649a97f fix unstructured example flow (#2052) 2025-02-07 10:38:15 -05:00
Nicolas Lorin
e6100debac agent: improve knowledge naming (#2041) 2025-02-06 15:19:22 -05:00
32 changed files with 1462 additions and 340 deletions

View File

@@ -1,10 +1,18 @@
<div align="center">
![Logo of CrewAI, two people rowing on a boat](./docs/crewai_logo.png)
![Logo of CrewAI](./docs/crewai_logo.png)
# **CrewAI**
🤖 **CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
**CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
**CrewAI Enterprise**
Want to plan, build (+ no code), deploy, monitor and interare your agents: [CrewAI Enterprise](https://www.crewai.com/enterprise). Designed for complex, real-world applications, our enterprise solution offers:
- **Seamless Integrations**
- **Scalable & Secure Deployment**
- **Actionable Insights**
- **24/7 Support**
<h3>
@@ -392,7 +400,7 @@ class AdvancedAnalysisFlow(Flow[MarketState]):
goal="Gather and validate supporting market data",
backstory="You excel at finding and correlating multiple data sources"
)
analysis_task = Task(
description="Analyze {sector} sector data for the past {timeframe}",
expected_output="Detailed market analysis with confidence score",
@@ -403,7 +411,7 @@ class AdvancedAnalysisFlow(Flow[MarketState]):
expected_output="Corroborating evidence and potential contradictions",
agent=researcher
)
# Demonstrate crew autonomy
analysis_crew = Crew(
agents=[analyst, researcher],

View File

@@ -23,14 +23,14 @@ A crew in crewAI represents a collaborative group of agents working together to
| **Language** _(optional)_ | `language` | Language used for the crew, defaults to English. |
| **Language File** _(optional)_ | `language_file` | Path to the language file to be used for the crew. |
| **Memory** _(optional)_ | `memory` | Utilized for storing execution memories (short-term, long-term, entity memory). |
| **Memory Config** _(optional)_ | `memory_config` | Configuration for the memory provider to be used by the crew. |
| **Cache** _(optional)_ | `cache` | Specifies whether to use a cache for storing the results of tools' execution. Defaults to `True`. |
| **Embedder** _(optional)_ | `embedder` | Configuration for the embedder to be used by the crew. Mostly used by memory for now. Default is `{"provider": "openai"}`. |
| **Full Output** _(optional)_ | `full_output` | Whether the crew should return the full output with all tasks outputs or just the final output. Defaults to `False`. |
| **Memory Config** _(optional)_ | `memory_config` | Configuration for the memory provider to be used by the crew. |
| **Cache** _(optional)_ | `cache` | Specifies whether to use a cache for storing the results of tools' execution. Defaults to `True`. |
| **Embedder** _(optional)_ | `embedder` | Configuration for the embedder to be used by the crew. Mostly used by memory for now. Default is `{"provider": "openai"}`. |
| **Full Output** _(optional)_ | `full_output` | Whether the crew should return the full output with all tasks outputs or just the final output. Defaults to `False`. |
| **Step Callback** _(optional)_ | `step_callback` | A function that is called after each step of every agent. This can be used to log the agent's actions or to perform other operations; it won't override the agent-specific `step_callback`. |
| **Task Callback** _(optional)_ | `task_callback` | A function that is called after the completion of each task. Useful for monitoring or additional operations post-task execution. |
| **Share Crew** _(optional)_ | `share_crew` | Whether you want to share the complete crew information and execution with the crewAI team to make the library better, and allow us to train models. |
| **Output Log File** _(optional)_ | `output_log_file` | Whether you want to have a file with the complete crew output and execution. You can set it using True and it will default to the folder you are currently in and it will be called logs.txt or passing a string with the full path and name of the file. |
| **Output Log File** _(optional)_ | `output_log_file` | Set to True to save logs as logs.txt in the current directory or provide a file path. Logs will be in JSON format if the filename ends in .json, otherwise .txt. Defautls to `None`. |
| **Manager Agent** _(optional)_ | `manager_agent` | `manager` sets a custom agent that will be used as a manager. |
| **Prompt File** _(optional)_ | `prompt_file` | Path to the prompt JSON file to be used for the crew. |
| **Planning** *(optional)* | `planning` | Adds planning ability to the Crew. When activated before each Crew iteration, all Crew data is sent to an AgentPlanner that will plan the tasks and this plan will be added to each task description. |
@@ -240,6 +240,23 @@ print(f"Tasks Output: {crew_output.tasks_output}")
print(f"Token Usage: {crew_output.token_usage}")
```
## Accessing Crew Logs
You can see real time log of the crew execution, by setting `output_log_file` as a `True(Boolean)` or a `file_name(str)`. Supports logging of events as both `file_name.txt` and `file_name.json`.
In case of `True(Boolean)` will save as `logs.txt`.
In case of `output_log_file` is set as `False(Booelan)` or `None`, the logs will not be populated.
```python Code
# Save crew logs
crew = Crew(output_log_file = True) # Logs will be saved as logs.txt
crew = Crew(output_log_file = file_name) # Logs will be saved as file_name.txt
crew = Crew(output_log_file = file_name.txt) # Logs will be saved as file_name.txt
crew = Crew(output_log_file = file_name.json) # Logs will be saved as file_name.json
```
## Memory Utilization
Crews can utilize memory (short-term, long-term, and entity memory) to enhance their execution and learning over time. This feature allows crews to store and recall execution memories, aiding in decision-making and task execution strategies.

View File

@@ -232,18 +232,18 @@ class UnstructuredExampleFlow(Flow):
def first_method(self):
# The state automatically includes an 'id' field
print(f"State ID: {self.state['id']}")
self.state.message = "Hello from structured flow"
self.state.counter = 0
self.state['counter'] = 0
self.state['message'] = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state.counter += 1
self.state.message += " - updated"
self.state['counter'] += 1
self.state['message'] += " - updated"
@listen(second_method)
def third_method(self):
self.state.counter += 1
self.state.message += " - updated again"
self.state['counter'] += 1
self.state['message'] += " - updated again"
print(f"State after third_method: {self.state}")

View File

@@ -91,7 +91,7 @@ result = crew.kickoff(inputs={"question": "What city does John live in and how o
```
Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including TXT, PDF, DOCX, HTML, and more.
Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including MD, PDF, DOCX, HTML, and more.
<Note>
You need to install `docling` for the following example to work: `uv add docling`
@@ -152,10 +152,10 @@ Here are examples of how to use different types of knowledge sources:
### Text File Knowledge Source
```python
from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
# Create a text file knowledge source
text_source = CrewDoclingSource(
text_source = TextFileKnowledgeSource(
file_paths=["document.txt", "another.txt"]
)

View File

@@ -463,26 +463,32 @@ Learn how to get the most out of your LLM configuration:
<Accordion title="Google">
```python Code
# Option 1. Gemini accessed with an API key.
# Option 1: Gemini accessed with an API key.
# https://ai.google.dev/gemini-api/docs/api-key
GEMINI_API_KEY=<your-api-key>
# Option 2. Vertex AI IAM credentials for Gemini, Anthropic, and anything in the Model Garden.
# Option 2: Vertex AI IAM credentials for Gemini, Anthropic, and Model Garden.
# https://cloud.google.com/vertex-ai/generative-ai/docs/overview
```
## GET CREDENTIALS
Get credentials:
```python Code
import json
file_path = 'path/to/vertex_ai_service_account.json'
# Load the JSON file
with open(file_path, 'r') as file:
vertex_credentials = json.load(file)
# Convert to JSON string
# Convert the credentials to a JSON string
vertex_credentials_json = json.dumps(vertex_credentials)
```
Example usage:
```python Code
from crewai import LLM
llm = LLM(
model="gemini/gemini-1.5-pro-latest",
temperature=0.7,

View File

@@ -58,41 +58,107 @@ my_crew = Crew(
### Example: Use Custom Memory Instances e.g FAISS as the VectorDB
```python Code
from crewai import Crew, Agent, Task, Process
from crewai import Crew, Process
from crewai.memory import LongTermMemory, ShortTermMemory, EntityMemory
from crewai.memory.storage import LTMSQLiteStorage, RAGStorage
from typing import List, Optional
# Assemble your crew with memory capabilities
my_crew = Crew(
agents=[...],
tasks=[...],
process="Process.sequential",
memory=True,
long_term_memory=EnhanceLongTermMemory(
my_crew: Crew = Crew(
agents = [...],
tasks = [...],
process = Process.sequential,
memory = True,
# Long-term memory for persistent storage across sessions
long_term_memory = LongTermMemory(
storage=LTMSQLiteStorage(
db_path="/my_data_dir/my_crew1/long_term_memory_storage.db"
db_path="/my_crew1/long_term_memory_storage.db"
)
),
short_term_memory=EnhanceShortTermMemory(
storage=CustomRAGStorage(
crew_name="my_crew",
storage_type="short_term",
data_dir="//my_data_dir",
model=embedder["model"],
dimension=embedder["dimension"],
# Short-term memory for current context using RAG
short_term_memory = ShortTermMemory(
storage = RAGStorage(
embedder_config={
"provider": "openai",
"config": {
"model": 'text-embedding-3-small'
}
},
type="short_term",
path="/my_crew1/"
)
),
),
entity_memory=EnhanceEntityMemory(
storage=CustomRAGStorage(
crew_name="my_crew",
storage_type="entities",
data_dir="//my_data_dir",
model=embedder["model"],
dimension=embedder["dimension"],
),
# Entity memory for tracking key information about entities
entity_memory = EntityMemory(
storage=RAGStorage(
embedder_config={
"provider": "openai",
"config": {
"model": 'text-embedding-3-small'
}
},
type="short_term",
path="/my_crew1/"
)
),
verbose=True,
)
```
## Security Considerations
When configuring memory storage:
- Use environment variables for storage paths (e.g., `CREWAI_STORAGE_DIR`)
- Never hardcode sensitive information like database credentials
- Consider access permissions for storage directories
- Use relative paths when possible to maintain portability
Example using environment variables:
```python
import os
from crewai import Crew
from crewai.memory import LongTermMemory
from crewai.memory.storage import LTMSQLiteStorage
# Configure storage path using environment variable
storage_path = os.getenv("CREWAI_STORAGE_DIR", "./storage")
crew = Crew(
memory=True,
long_term_memory=LongTermMemory(
storage=LTMSQLiteStorage(
db_path="{storage_path}/memory.db".format(storage_path=storage_path)
)
)
)
```
## Configuration Examples
### Basic Memory Configuration
```python
from crewai import Crew
from crewai.memory import LongTermMemory
# Simple memory configuration
crew = Crew(memory=True) # Uses default storage locations
```
### Custom Storage Configuration
```python
from crewai import Crew
from crewai.memory import LongTermMemory
from crewai.memory.storage import LTMSQLiteStorage
# Configure custom storage paths
crew = Crew(
memory=True,
long_term_memory=LongTermMemory(
storage=LTMSQLiteStorage(db_path="./memory.db")
)
)
```
## Integrating Mem0 for Enhanced User Memory
[Mem0](https://mem0.ai/) is a self-improving memory layer for LLM applications, enabling personalized AI experiences.
@@ -185,7 +251,12 @@ my_crew = Crew(
process=Process.sequential,
memory=True,
verbose=True,
embedder=OpenAIEmbeddingFunction(api_key=os.getenv("OPENAI_API_KEY"), model="text-embedding-3-small"),
embedder={
"provider": "openai",
"config": {
"model": 'text-embedding-3-small'
}
}
)
```
@@ -211,6 +282,19 @@ my_crew = Crew(
### Using Google AI embeddings
#### Prerequisites
Before using Google AI embeddings, ensure you have:
- Access to the Gemini API
- The necessary API keys and permissions
You will need to update your *pyproject.toml* dependencies:
```YAML
dependencies = [
"google-generativeai>=0.8.4", #main version in January/2025 - crewai v.0.100.0 and crewai-tools 0.33.0
"crewai[tools]>=0.100.0,<1.0.0"
]
```
```python Code
from crewai import Crew, Agent, Task, Process
@@ -242,13 +326,15 @@ my_crew = Crew(
process=Process.sequential,
memory=True,
verbose=True,
embedder=OpenAIEmbeddingFunction(
api_key="YOUR_API_KEY",
api_base="YOUR_API_BASE_PATH",
api_type="azure",
api_version="YOUR_API_VERSION",
model="text-embedding-3-small"
)
embedder={
"provider": "openai",
"config": {
"api_key": "YOUR_API_KEY",
"api_base": "YOUR_API_BASE_PATH",
"api_version": "YOUR_API_VERSION",
"model_name": 'text-embedding-3-small'
}
}
)
```
@@ -264,12 +350,15 @@ my_crew = Crew(
process=Process.sequential,
memory=True,
verbose=True,
embedder=GoogleVertexEmbeddingFunction(
project_id="YOUR_PROJECT_ID",
region="YOUR_REGION",
api_key="YOUR_API_KEY",
model="textembedding-gecko"
)
embedder={
"provider": "vertexai",
"config": {
"project_id"="YOUR_PROJECT_ID",
"region"="YOUR_REGION",
"api_key"="YOUR_API_KEY",
"model_name"="textembedding-gecko"
}
}
)
```
@@ -358,6 +447,65 @@ my_crew = Crew(
)
```
### Using Amazon Bedrock embeddings
```python Code
# Note: Ensure you have installed `boto3` for Bedrock embeddings to work.
import os
import boto3
from crewai import Crew, Agent, Task, Process
boto3_session = boto3.Session(
region_name=os.environ.get("AWS_REGION_NAME"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY")
)
my_crew = Crew(
agents=[...],
tasks=[...],
process=Process.sequential,
memory=True,
embedder={
"provider": "bedrock",
"config":{
"session": boto3_session,
"model": "amazon.titan-embed-text-v2:0",
"vector_dimension": 1024
}
}
verbose=True
)
```
### Adding Custom Embedding Function
```python Code
from crewai import Crew, Agent, Task, Process
from chromadb import Documents, EmbeddingFunction, Embeddings
# Create a custom embedding function
class CustomEmbedder(EmbeddingFunction):
def __call__(self, input: Documents) -> Embeddings:
# generate embeddings
return [1, 2, 3] # this is a dummy embedding
my_crew = Crew(
agents=[...],
tasks=[...],
process=Process.sequential,
memory=True,
verbose=True,
embedder={
"provider": "custom",
"config": {
"embedder": CustomEmbedder()
}
}
)
```
### Resetting Memory
```shell

View File

@@ -268,7 +268,7 @@ analysis_task = Task(
Task guardrails provide a way to validate and transform task outputs before they
are passed to the next task. This feature helps ensure data quality and provides
efeedback to agents when their output doesn't meet specific criteria.
feedback to agents when their output doesn't meet specific criteria.
### Using Task Guardrails

View File

@@ -8,9 +8,9 @@ icon: file-pen
## Description
The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files.
The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files with cross-platform compatibility (Windows, Linux, macOS).
It is particularly useful in scenarios such as generating reports, saving logs, creating configuration files, and more.
This tool supports creating new directories if they don't exist, making it easier to organize your output.
This tool handles path differences across operating systems, supports UTF-8 encoding, and automatically creates directories if they don't exist, making it easier to organize your output reliably across different platforms.
## Installation
@@ -43,6 +43,8 @@ print(result)
## Conclusion
By integrating the `FileWriterTool` into your crews, the agents can execute the process of writing content to files and creating directories.
This tool is essential for tasks that require saving output data, creating structured file systems, and more. By adhering to the setup and usage guidelines provided,
incorporating this tool into projects is straightforward and efficient.
By integrating the `FileWriterTool` into your crews, the agents can reliably write content to files across different operating systems.
This tool is essential for tasks that require saving output data, creating structured file systems, and handling cross-platform file operations.
It's particularly recommended for Windows users who may encounter file writing issues with standard Python file operations.
By adhering to the setup and usage guidelines provided, incorporating this tool into projects is straightforward and ensures consistent file writing behavior across all platforms.

3
pytest.ini Normal file
View File

@@ -0,0 +1,3 @@
[pytest]
markers =
agentops: Tests for AgentOps integration

View File

@@ -1,6 +1,7 @@
import re
import shutil
import subprocess
from typing import Any, Dict, List, Literal, Optional, Union
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -15,7 +16,6 @@ from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.task import Task
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import Tool
from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
@@ -54,7 +54,6 @@ class Agent(BaseAgent):
llm: The language model that will run the agent.
function_calling_llm: The language model that will handle the tool calling for this agent, it overrides the crew function_calling_llm.
max_iter: Maximum number of iterations for an agent to execute a task.
memory: Whether the agent should have memory or not.
max_rpm: Maximum number of requests per minute for the agent execution to be respected.
verbose: Whether the agent execution should be in verbose mode.
allow_delegation: Whether the agent is allowed to delegate tasks to other agents.
@@ -71,9 +70,6 @@ class Agent(BaseAgent):
)
agent_ops_agent_name: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
agent_ops_agent_id: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
cache_handler: InstanceOf[CacheHandler] = Field(
default=None, description="An instance of the CacheHandler class."
)
step_callback: Optional[Any] = Field(
default=None,
description="Callback to be executed after each step of the agent execution.",
@@ -107,10 +103,6 @@ class Agent(BaseAgent):
default=True,
description="Keep messages under the context window size by summarizing content.",
)
max_iter: int = Field(
default=20,
description="Maximum number of iterations for an agent to execute a task before giving it's best answer",
)
max_retry_limit: int = Field(
default=2,
description="Maximum number of retries for an agent to execute a task when an error occurs.",
@@ -153,7 +145,8 @@ class Agent(BaseAgent):
def _set_knowledge(self):
try:
if self.knowledge_sources:
knowledge_agent_name = f"{self.role.replace(' ', '_')}"
full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)")
knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}"
if isinstance(self.knowledge_sources, list) and all(
isinstance(k, BaseKnowledgeSource) for k in self.knowledge_sources
):
@@ -195,13 +188,15 @@ class Agent(BaseAgent):
if task.output_json:
# schema = json.dumps(task.output_json, indent=2)
schema = generate_model_description(task.output_json)
task_prompt += "\n" + self.i18n.slice(
"formatted_task_instructions"
).format(output_format=schema)
elif task.output_pydantic:
schema = generate_model_description(task.output_pydantic)
task_prompt += "\n" + self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
task_prompt += "\n" + self.i18n.slice(
"formatted_task_instructions"
).format(output_format=schema)
if context:
task_prompt = self.i18n.slice("task_with_context").format(
@@ -329,14 +324,14 @@ class Agent(BaseAgent):
tools = agent_tools.tools()
return tools
def get_multimodal_tools(self) -> List[Tool]:
def get_multimodal_tools(self) -> Sequence[BaseTool]:
from crewai.tools.agent_tools.add_image_tool import AddImageTool
return [AddImageTool()]
def get_code_execution_tools(self):
try:
from crewai_tools import CodeInterpreterTool
from crewai_tools import CodeInterpreterTool # type: ignore
# Set the unsafe_mode based on the code_execution_mode attribute
unsafe_mode = self.code_execution_mode == "unsafe"

View File

@@ -24,6 +24,7 @@ from crewai.tools import BaseTool
from crewai.tools.base_tool import Tool
from crewai.utilities import I18N, Logger, RPMController
from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter
T = TypeVar("T", bound="BaseAgent")
@@ -42,7 +43,7 @@ class BaseAgent(ABC, BaseModel):
max_rpm (Optional[int]): Maximum number of requests per minute for the agent execution.
allow_delegation (bool): Allow delegation of tasks to agents.
tools (Optional[List[Any]]): Tools at the agent's disposal.
max_iter (Optional[int]): Maximum iterations for an agent to execute a task.
max_iter (int): Maximum iterations for an agent to execute a task.
agent_executor (InstanceOf): An instance of the CrewAgentExecutor class.
llm (Any): Language model that will run the agent.
crew (Any): Crew to which the agent belongs.
@@ -114,7 +115,7 @@ class BaseAgent(ABC, BaseModel):
tools: Optional[List[Any]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
max_iter: Optional[int] = Field(
max_iter: int = Field(
default=25, description="Maximum iterations for an agent to execute a task"
)
agent_executor: InstanceOf = Field(
@@ -125,11 +126,12 @@ class BaseAgent(ABC, BaseModel):
)
crew: Any = Field(default=None, description="Crew to which the agent belongs.")
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
cache_handler: InstanceOf[CacheHandler] = Field(
cache_handler: Optional[InstanceOf[CacheHandler]] = Field(
default=None, description="An instance of the CacheHandler class."
)
tools_handler: InstanceOf[ToolsHandler] = Field(
default=None, description="An instance of the ToolsHandler class."
default_factory=ToolsHandler,
description="An instance of the ToolsHandler class.",
)
max_tokens: Optional[int] = Field(
default=None, description="Maximum number of tokens for the agent's execution."
@@ -254,7 +256,7 @@ class BaseAgent(ABC, BaseModel):
@abstractmethod
def get_output_converter(
self, llm: Any, text: str, model: type[BaseModel] | None, instructions: str
):
) -> Converter:
"""Get the converter class for the agent to create json/pydantic outputs."""
pass

View File

@@ -2,11 +2,7 @@ import subprocess
import click
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.cli.utils import get_crew
def reset_memories_command(
@@ -30,30 +26,35 @@ def reset_memories_command(
"""
try:
crew = get_crew()
if not crew:
raise ValueError("No crew found.")
if all:
ShortTermMemory().reset()
EntityMemory().reset()
LongTermMemory().reset()
TaskOutputStorageHandler().reset()
KnowledgeStorage().reset()
crew.reset_memories(command_type="all")
click.echo("All memories have been reset.")
else:
if long:
LongTermMemory().reset()
click.echo("Long term memory has been reset.")
return
if short:
ShortTermMemory().reset()
click.echo("Short term memory has been reset.")
if entity:
EntityMemory().reset()
click.echo("Entity memory has been reset.")
if kickoff_outputs:
TaskOutputStorageHandler().reset()
click.echo("Latest Kickoff outputs stored has been reset.")
if knowledge:
KnowledgeStorage().reset()
click.echo("Knowledge has been reset.")
if not any([long, short, entity, kickoff_outputs, knowledge]):
click.echo(
"No memory type specified. Please specify at least one type to reset."
)
return
if long:
crew.reset_memories(command_type="long")
click.echo("Long term memory has been reset.")
if short:
crew.reset_memories(command_type="short")
click.echo("Short term memory has been reset.")
if entity:
crew.reset_memories(command_type="entity")
click.echo("Entity memory has been reset.")
if kickoff_outputs:
crew.reset_memories(command_type="kickoff_outputs")
click.echo("Latest Kickoff outputs stored has been reset.")
if knowledge:
crew.reset_memories(command_type="knowledge")
click.echo("Knowledge has been reset.")
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while resetting the memories: {e}", err=True)

View File

@@ -9,6 +9,7 @@ import tomli
from rich.console import Console
from crewai.cli.constants import ENV_VARS
from crewai.crew import Crew
if sys.version_info >= (3, 11):
import tomllib
@@ -247,3 +248,64 @@ def write_env_file(folder_path, env_vars):
with open(env_file_path, "w") as file:
for key, value in env_vars.items():
file.write(f"{key}={value}\n")
def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None:
"""Get the crew instance from the crew.py file."""
try:
import importlib.util
import os
for root, _, files in os.walk("."):
if "crew.py" in files:
crew_path = os.path.join(root, "crew.py")
try:
spec = importlib.util.spec_from_file_location(
"crew_module", crew_path
)
if not spec or not spec.loader:
continue
module = importlib.util.module_from_spec(spec)
try:
sys.modules[spec.name] = module
spec.loader.exec_module(module)
for attr_name in dir(module):
attr = getattr(module, attr_name)
try:
if callable(attr) and hasattr(attr, "crew"):
crew_instance = attr().crew()
return crew_instance
except Exception as e:
print(f"Error processing attribute {attr_name}: {e}")
continue
except Exception as exec_error:
print(f"Error executing module: {exec_error}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
except (ImportError, AttributeError) as e:
if require:
console.print(
f"Error importing crew from {crew_path}: {str(e)}",
style="bold red",
)
continue
break
if require:
console.print("No valid Crew instance found in crew.py", style="bold red")
raise SystemExit
return None
except Exception as e:
if require:
console.print(
f"Unexpected error while loading crew: {str(e)}", style="bold red"
)
raise SystemExit
return None

View File

@@ -1,5 +1,6 @@
import asyncio
import json
import os
import re
import uuid
import warnings
@@ -54,8 +55,11 @@ from crewai.utilities.training_handler import CrewTrainingHandler
try:
import agentops # type: ignore
from agentops.exceptions import AgentOpsError, AuthenticationError # type: ignore
except ImportError:
agentops = None
AgentOpsError = None
AuthenticationError = None
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
@@ -90,6 +94,8 @@ class Crew(BaseModel):
__hash__ = object.__hash__ # type: ignore
_execution_span: Any = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr()
_agentops: Optional['agentops.AgentOps'] = PrivateAttr(default=None)
_telemetry: Optional[Telemetry] = PrivateAttr(default=None)
_logger: Logger = PrivateAttr()
_file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
@@ -183,9 +189,9 @@ class Crew(BaseModel):
default=None,
description="Path to the prompt json file to be used for the crew.",
)
output_log_file: Optional[str] = Field(
output_log_file: Optional[Union[bool, str]] = Field(
default=None,
description="output_log_file",
description="Path to the log file to be saved",
)
planning: Optional[bool] = Field(
default=False,
@@ -240,19 +246,72 @@ class Crew(BaseModel):
# TODO: Improve typing
return json.loads(v) if isinstance(v, Json) else v # type: ignore
@model_validator(mode="after")
def _validate_api_key(self, api_key: Optional[str]) -> bool:
"""Validate the AgentOps API key.
Args:
api_key: The API key to validate
Returns:
bool: True if the API key is valid, False otherwise
"""
if not api_key:
return False
stripped_key = api_key.strip()
return bool(stripped_key and len(stripped_key) > 10)
def set_private_attrs(self) -> "Crew":
"""Set private attributes."""
"""Initialize private attributes including AgentOps integration.
This method sets up:
- Logger and file handler for output logging
- RPM controller for rate limiting
- AgentOps integration for monitoring (if available and configured)
"""
self._cache_handler = CacheHandler()
self._logger = Logger(verbose=self.verbose)
if self.output_log_file:
self._file_handler = FileHandler(self.output_log_file)
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = create_llm(self.function_calling_llm)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
# Initialize agentops if available and API key is present
if agentops:
api_key = os.getenv("AGENTOPS_API_KEY")
if self._validate_api_key(api_key):
try:
agentops.init(api_key)
self._agentops = agentops
self._logger.log(
"info",
"Successfully initialized agentops",
color="green"
)
except (ConnectionError, AuthenticationError) as e:
self._logger.log(
"warning",
f"Failed to connect to agentops: {e}",
color="yellow"
)
self._agentops = None
except (ValueError, AgentOpsError) as e:
self._logger.log(
"warning",
f"Invalid agentops configuration: {e}",
color="yellow"
)
self._agentops = None
else:
self._logger.log(
"warning",
"Invalid AGENTOPS_API_KEY provided",
color="yellow"
)
self._agentops = None
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = create_llm(self.function_calling_llm)
return self
@model_validator(mode="after")
@@ -380,6 +439,22 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def validate_must_have_non_conditional_task(self) -> "Crew":
"""Ensure that a crew has at least one non-conditional task."""
if not self.tasks:
return self
non_conditional_count = sum(
1 for task in self.tasks if not isinstance(task, ConditionalTask)
)
if non_conditional_count == 0:
raise PydanticCustomError(
"only_conditional_tasks",
"Crew must include at least one non-conditional task",
{},
)
return self
@model_validator(mode="after")
def validate_first_task(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
@@ -439,6 +514,8 @@ class Crew(BaseModel):
)
return self
@property
def key(self) -> str:
source = [agent.key for agent in self.agents] + [
@@ -525,7 +602,8 @@ class Crew(BaseModel):
inputs = before_callback(inputs)
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
if self._telemetry:
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
self._task_output_handler.reset()
self._logging_color = "bold_purple"
@@ -681,12 +759,7 @@ class Crew(BaseModel):
manager.tools = []
raise Exception("Manager agent should not have tools")
else:
self.manager_llm = (
getattr(self.manager_llm, "model_name", None)
or getattr(self.manager_llm, "model", None)
or getattr(self.manager_llm, "deployment_name", None)
or self.manager_llm
)
self.manager_llm = create_llm(self.manager_llm)
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
@@ -746,6 +819,7 @@ class Crew(BaseModel):
task, task_outputs, futures, task_index, was_replayed
)
if skipped_task_output:
task_outputs.append(skipped_task_output)
continue
if task.async_execution:
@@ -769,7 +843,7 @@ class Crew(BaseModel):
context=context,
tools=tools_for_task,
)
task_outputs = [task_output]
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
@@ -790,7 +864,7 @@ class Crew(BaseModel):
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
previous_output = task_outputs[task_index - 1] if task_outputs else None
previous_output = task_outputs[-1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",
@@ -912,11 +986,15 @@ class Crew(BaseModel):
)
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
if len(task_outputs) != 1:
raise ValueError(
"Something went wrong. Kickoff should return only one task output."
)
final_task_output = task_outputs[0]
if not task_outputs:
raise ValueError("No task outputs available to create crew output.")
# Filter out empty outputs and get the last valid one as the main output
valid_outputs = [t for t in task_outputs if t.raw]
if not valid_outputs:
raise ValueError("No valid task outputs available to create crew output.")
final_task_output = valid_outputs[-1]
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
@@ -925,7 +1003,7 @@ class Crew(BaseModel):
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=[task.output for task in self.tasks if task.output],
tasks_output=task_outputs,
token_usage=token_usage,
)
@@ -1103,16 +1181,22 @@ class Crew(BaseModel):
for agent in self.agents:
agent.interpolate_inputs(inputs)
def _finish_execution(self, final_string_output: str) -> None:
def _finish_execution(self, final_output: Union[str, CrewOutput]) -> None:
"""Finish execution and cleanup.
Args:
final_output: The final output from crew execution, either as string or CrewOutput
"""
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
if agentops:
agentops.end_session(
if self._telemetry:
self._telemetry.end_crew(self, final_output)
if self._agentops:
self._agentops.end_session(
end_state="Success",
end_state_reason="Finished Execution",
is_auto_end=True,
)
self._telemetry.end_crew(self, final_string_output)
def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics."""
@@ -1152,3 +1236,80 @@ class Crew(BaseModel):
def __repr__(self):
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"
def reset_memories(self, command_type: str) -> None:
"""Reset specific or all memories for the crew.
Args:
command_type: Type of memory to reset.
Valid options: 'long', 'short', 'entity', 'knowledge',
'kickoff_outputs', or 'all'
Raises:
ValueError: If an invalid command type is provided.
RuntimeError: If memory reset operation fails.
"""
VALID_TYPES = frozenset(
["long", "short", "entity", "knowledge", "kickoff_outputs", "all"]
)
if command_type not in VALID_TYPES:
raise ValueError(
f"Invalid command type. Must be one of: {', '.join(sorted(VALID_TYPES))}"
)
try:
if command_type == "all":
self._reset_all_memories()
else:
self._reset_specific_memory(command_type)
self._logger.log("info", f"{command_type} memory has been reset")
except Exception as e:
error_msg = f"Failed to reset {command_type} memory: {str(e)}"
self._logger.log("error", error_msg)
raise RuntimeError(error_msg) from e
def _reset_all_memories(self) -> None:
"""Reset all available memory systems."""
memory_systems = [
("short term", self._short_term_memory),
("entity", self._entity_memory),
("long term", self._long_term_memory),
("task output", self._task_output_handler),
("knowledge", self.knowledge),
]
for name, system in memory_systems:
if system is not None:
try:
system.reset()
except Exception as e:
raise RuntimeError(f"Failed to reset {name} memory") from e
def _reset_specific_memory(self, memory_type: str) -> None:
"""Reset a specific memory system.
Args:
memory_type: Type of memory to reset
Raises:
RuntimeError: If the specified memory system fails to reset
"""
reset_functions = {
"long": (self._long_term_memory, "long term"),
"short": (self._short_term_memory, "short term"),
"entity": (self._entity_memory, "entity"),
"knowledge": (self.knowledge, "knowledge"),
"kickoff_outputs": (self._task_output_handler, "task output"),
}
memory_system, name = reset_functions[memory_type]
if memory_system is None:
raise RuntimeError(f"{name} memory system is not initialized")
try:
memory_system.reset()
except Exception as e:
raise RuntimeError(f"Failed to reset {name} memory") from e

View File

@@ -600,7 +600,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
```
"""
try:
if not hasattr(self, '_state'):
if not hasattr(self, "_state"):
return ""
if isinstance(self._state, dict):
@@ -706,26 +706,31 @@ class Flow(Generic[T], metaclass=FlowMeta):
inputs: Optional dictionary containing input values and potentially a state ID to restore
"""
# Handle state restoration if ID is provided in inputs
if inputs and 'id' in inputs and self._persistence is not None:
restore_uuid = inputs['id']
if inputs and "id" in inputs and self._persistence is not None:
restore_uuid = inputs["id"]
stored_state = self._persistence.load_state(restore_uuid)
# Override the id in the state if it exists in inputs
if 'id' in inputs:
if "id" in inputs:
if isinstance(self._state, dict):
self._state['id'] = inputs['id']
self._state["id"] = inputs["id"]
elif isinstance(self._state, BaseModel):
setattr(self._state, 'id', inputs['id'])
setattr(self._state, "id", inputs["id"])
if stored_state:
self._log_flow_event(f"Loading flow state from memory for UUID: {restore_uuid}", color="yellow")
self._log_flow_event(
f"Loading flow state from memory for UUID: {restore_uuid}",
color="yellow",
)
# Restore the state
self._restore_state(stored_state)
else:
self._log_flow_event(f"No flow state found for UUID: {restore_uuid}", color="red")
self._log_flow_event(
f"No flow state found for UUID: {restore_uuid}", color="red"
)
# Apply any additional inputs after restoration
filtered_inputs = {k: v for k, v in inputs.items() if k != 'id'}
filtered_inputs = {k: v for k, v in inputs.items() if k != "id"}
if filtered_inputs:
self._initialize_state(filtered_inputs)
@@ -737,9 +742,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
flow_name=self.__class__.__name__,
),
)
self._log_flow_event(f"Flow started with ID: {self.flow_id}", color="bold_magenta")
self._log_flow_event(
f"Flow started with ID: {self.flow_id}", color="bold_magenta"
)
if inputs is not None and 'id' not in inputs:
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
return asyncio.run(self.kickoff_async())
@@ -984,7 +991,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
traceback.print_exc()
def _log_flow_event(self, message: str, color: str = "yellow", level: str = "info") -> None:
def _log_flow_event(
self, message: str, color: str = "yellow", level: str = "info"
) -> None:
"""Centralized logging method for flow events.
This method provides a consistent interface for logging flow-related events,

View File

@@ -67,3 +67,9 @@ class Knowledge(BaseModel):
source.add()
except Exception as e:
raise e
def reset(self) -> None:
if self.storage:
self.storage.reset()
else:
raise ValueError("Storage is not initialized.")

View File

@@ -1,28 +1,138 @@
from pathlib import Path
from typing import Dict, List
from typing import Dict, Iterator, List, Optional, Union
from urllib.parse import urlparse
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
from pydantic import Field, field_validator
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
class ExcelKnowledgeSource(BaseFileKnowledgeSource):
class ExcelKnowledgeSource(BaseKnowledgeSource):
"""A knowledge source that stores and queries Excel file content using embeddings."""
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess Excel file content."""
pd = self._import_dependencies()
# override content to be a dict of file paths to sheet names to csv content
_logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default=None,
description="[Deprecated] The path to the file. Use file_paths instead.",
)
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default_factory=list, description="The path to the file"
)
chunks: List[str] = Field(default_factory=list)
content: Dict[Path, Dict[str, str]] = Field(default_factory=dict)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
"""Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions
if (
v is None
and info.data.get(
"file_path" if info.field_name == "file_paths" else "file_paths"
)
is None
):
raise ValueError("Either file_path or file_paths must be provided")
return v
def _process_file_paths(self) -> List[Path]:
"""Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
if self.file_paths is None:
raise ValueError("Your source must be provided with a file_paths: []")
# Convert single path to list
path_list: List[Union[Path, str]] = (
[self.file_paths]
if isinstance(self.file_paths, (str, Path))
else list(self.file_paths)
if isinstance(self.file_paths, list)
else []
)
if not path_list:
raise ValueError(
"file_path/file_paths must be a Path, str, or a list of these types"
)
return [self.convert_to_path(path) for path in path_list]
def validate_content(self):
"""Validate the paths."""
for path in self.safe_file_paths:
if not path.exists():
self._logger.log(
"error",
f"File not found: {path}. Try adding sources to the knowledge directory. If it's inside the knowledge directory, use the relative path.",
color="red",
)
raise FileNotFoundError(f"File not found: {path}")
if not path.is_file():
self._logger.log(
"error",
f"Path is not a file: {path}",
color="red",
)
def model_post_init(self, _) -> None:
if self.file_path:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
self.safe_file_paths = self._process_file_paths()
self.validate_content()
self.content = self._load_content()
def _load_content(self) -> Dict[Path, Dict[str, str]]:
"""Load and preprocess Excel file content from multiple sheets.
Each sheet's content is converted to CSV format and stored.
Returns:
Dict[Path, Dict[str, str]]: A mapping of file paths to their respective sheet contents.
Raises:
ImportError: If required dependencies are missing.
FileNotFoundError: If the specified Excel file cannot be opened.
"""
pd = self._import_dependencies()
content_dict = {}
for file_path in self.safe_file_paths:
file_path = self.convert_to_path(file_path)
df = pd.read_excel(file_path)
content = df.to_csv(index=False)
content_dict[file_path] = content
with pd.ExcelFile(file_path) as xl:
sheet_dict = {
str(sheet_name): str(
pd.read_excel(xl, sheet_name).to_csv(index=False)
)
for sheet_name in xl.sheet_names
}
content_dict[file_path] = sheet_dict
return content_dict
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
def _import_dependencies(self):
"""Dynamically import dependencies."""
try:
import openpyxl # noqa
import pandas as pd
return pd
@@ -38,10 +148,14 @@ class ExcelKnowledgeSource(BaseFileKnowledgeSource):
and save the embeddings.
"""
# Convert dictionary values to a single string if content is a dictionary
if isinstance(self.content, dict):
content_str = "\n".join(str(value) for value in self.content.values())
else:
content_str = str(self.content)
# Updated to account for .xlsx workbooks with multiple tabs/sheets
content_str = ""
for value in self.content.values():
if isinstance(value, dict):
for sheet_value in value.values():
content_str += str(sheet_value) + "\n"
else:
content_str += str(value) + "\n"
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)

View File

@@ -164,6 +164,7 @@ class LLM:
self.context_window_size = 0
self.reasoning_effort = reasoning_effort
self.additional_params = kwargs
self.is_anthropic = self._is_anthropic_model(model)
litellm.drop_params = True
@@ -178,42 +179,62 @@ class LLM:
self.set_callbacks(callbacks)
self.set_env_callbacks()
def _is_anthropic_model(self, model: str) -> bool:
"""Determine if the model is from Anthropic provider.
Args:
model: The model identifier string.
Returns:
bool: True if the model is from Anthropic, False otherwise.
"""
ANTHROPIC_PREFIXES = ('anthropic/', 'claude-', 'claude/')
return any(prefix in model.lower() for prefix in ANTHROPIC_PREFIXES)
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> str:
"""
High-level llm call method that:
1) Accepts either a string or a list of messages
2) Converts string input to the required message format
3) Calls litellm.completion
4) Handles function/tool calls if any
5) Returns the final text response or tool result
Parameters:
- messages (Union[str, List[Dict[str, str]]]): The input messages for the LLM.
- If a string is provided, it will be converted into a message list with a single entry.
- If a list of dictionaries is provided, each dictionary should have 'role' and 'content' keys.
- tools (Optional[List[dict]]): A list of tool schemas for function calling.
- callbacks (Optional[List[Any]]): A list of callback functions to be executed.
- available_functions (Optional[Dict[str, Any]]): A dictionary mapping function names to actual Python functions.
) -> Union[str, Any]:
"""High-level LLM call method.
Args:
messages: Input messages for the LLM.
Can be a string or list of message dictionaries.
If string, it will be converted to a single user message.
If list, each dict must have 'role' and 'content' keys.
tools: Optional list of tool schemas for function calling.
Each tool should define its name, description, and parameters.
callbacks: Optional list of callback functions to be executed
during and after the LLM call.
available_functions: Optional dict mapping function names to callables
that can be invoked by the LLM.
Returns:
- str: The final text response from the LLM or the result of a tool function call.
Union[str, Any]: Either a text response from the LLM (str) or
the result of a tool function call (Any).
Raises:
TypeError: If messages format is invalid
ValueError: If response format is not supported
LLMContextLengthExceededException: If input exceeds model's context limit
Examples:
---------
# Example 1: Using a string input
response = llm.call("Return the name of a random city in the world.")
print(response)
# Example 2: Using a list of messages
messages = [{"role": "user", "content": "What is the capital of France?"}]
response = llm.call(messages)
print(response)
# Example 1: Simple string input
>>> response = llm.call("Return the name of a random city.")
>>> print(response)
"Paris"
# Example 2: Message list with system and user messages
>>> messages = [
... {"role": "system", "content": "You are a geography expert"},
... {"role": "user", "content": "What is France's capital?"}
... ]
>>> response = llm.call(messages)
>>> print(response)
"The capital of France is Paris."
"""
# Validate parameters before proceeding with the call.
self._validate_call_params()
@@ -221,15 +242,25 @@ class LLM:
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# For O1 models, system messages are not supported.
# Convert any system messages into assistant messages.
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
message["role"] = "assistant"
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 1) Prepare the parameters for the completion call
# --- 1) Format messages according to provider requirements
formatted_messages = self._format_messages_for_provider(messages)
# --- 2) Prepare the parameters for the completion call
params = {
"model": self.model,
"messages": messages,
"messages": formatted_messages,
"timeout": self.timeout,
"temperature": self.temperature,
"top_p": self.top_p,
@@ -317,6 +348,38 @@ class LLM:
logging.error(f"LiteLLM call failed: {str(e)}")
raise
def _format_messages_for_provider(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Format messages according to provider requirements.
Args:
messages: List of message dictionaries with 'role' and 'content' keys.
Can be empty or None.
Returns:
List of formatted messages according to provider requirements.
For Anthropic models, ensures first message has 'user' role.
Raises:
TypeError: If messages is None or contains invalid message format.
"""
if messages is None:
raise TypeError("Messages cannot be None")
# Validate message format first
for msg in messages:
if not isinstance(msg, dict) or "role" not in msg or "content" not in msg:
raise TypeError("Invalid message format. Each message must be a dict with 'role' and 'content' keys")
if not self.is_anthropic:
return messages
# Anthropic requires messages to start with 'user' role
if not messages or messages[0]["role"] == "system":
# If first message is system or empty, add a placeholder user message
return [{"role": "user", "content": "."}, *messages]
return messages
def _get_custom_llm_provider(self) -> str:
"""
Derives the custom_llm_provider from the model string.

View File

@@ -1,3 +1,7 @@
from typing import Optional
from pydantic import PrivateAttr
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.storage.rag_storage import RAGStorage
@@ -10,13 +14,15 @@ class EntityMemory(Memory):
Inherits from the Memory class.
"""
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if hasattr(crew, "memory_config") and crew.memory_config is not None:
self.memory_provider = crew.memory_config.get("provider")
else:
self.memory_provider = None
_memory_provider: Optional[str] = PrivateAttr()
if self.memory_provider == "mem0":
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_provider = crew.memory_config.get("provider")
else:
memory_provider = None
if memory_provider == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
@@ -36,11 +42,13 @@ class EntityMemory(Memory):
path=path,
)
)
super().__init__(storage)
super().__init__(storage=storage)
self._memory_provider = memory_provider
def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
"""Saves an entity item into the SQLite storage."""
if self.memory_provider == "mem0":
if self._memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}

View File

@@ -17,7 +17,7 @@ class LongTermMemory(Memory):
def __init__(self, storage=None, path=None):
if not storage:
storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
super().__init__(storage)
super().__init__(storage=storage)
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
metadata = item.metadata

View File

@@ -1,15 +1,19 @@
from typing import Any, Dict, List, Optional
from crewai.memory.storage.rag_storage import RAGStorage
from pydantic import BaseModel
class Memory:
class Memory(BaseModel):
"""
Base class for memory, now supporting agent tags and generic metadata.
"""
def __init__(self, storage: RAGStorage):
self.storage = storage
embedder_config: Optional[Dict[str, Any]] = None
storage: Any
def __init__(self, storage: Any, **data: Any):
super().__init__(storage=storage, **data)
def save(
self,

View File

@@ -1,5 +1,7 @@
from typing import Any, Dict, Optional
from pydantic import PrivateAttr
from crewai.memory.memory import Memory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.memory.storage.rag_storage import RAGStorage
@@ -14,13 +16,15 @@ class ShortTermMemory(Memory):
MemoryItem instances.
"""
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if hasattr(crew, "memory_config") and crew.memory_config is not None:
self.memory_provider = crew.memory_config.get("provider")
else:
self.memory_provider = None
_memory_provider: Optional[str] = PrivateAttr()
if self.memory_provider == "mem0":
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_provider = crew.memory_config.get("provider")
else:
memory_provider = None
if memory_provider == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
@@ -39,7 +43,8 @@ class ShortTermMemory(Memory):
path=path,
)
)
super().__init__(storage)
super().__init__(storage=storage)
self._memory_provider = memory_provider
def save(
self,
@@ -48,7 +53,7 @@ class ShortTermMemory(Memory):
agent: Optional[str] = None,
) -> None:
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if self.memory_provider == "mem0":
if self._memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
super().save(value=item.data, metadata=item.metadata, agent=item.agent)

View File

@@ -13,7 +13,7 @@ class BaseRAGStorage(ABC):
self,
type: str,
allow_reset: bool = True,
embedder_config: Optional[Any] = None,
embedder_config: Optional[Dict[str, Any]] = None,
crew: Any = None,
):
self.type = type

View File

@@ -674,19 +674,32 @@ class Task(BaseModel):
return OutputFormat.PYDANTIC
return OutputFormat.RAW
def _save_file(self, result: Any) -> None:
def _save_file(self, result: Union[Dict, str, Any]) -> None:
"""Save task output to a file.
Note:
For cross-platform file writing, especially on Windows, consider using FileWriterTool
from the crewai_tools package:
pip install 'crewai[tools]'
from crewai_tools import FileWriterTool
Args:
result: The result to save to the file. Can be a dict or any stringifiable object.
Raises:
ValueError: If output_file is not set
RuntimeError: If there is an error writing to the file
RuntimeError: If there is an error writing to the file. For cross-platform
compatibility, especially on Windows, use FileWriterTool from crewai_tools
package.
"""
if self.output_file is None:
raise ValueError("output_file is not set.")
FILEWRITER_RECOMMENDATION = (
"For cross-platform file writing, especially on Windows, "
"use FileWriterTool from crewai_tools package."
)
try:
resolved_path = Path(self.output_file).expanduser().resolve()
directory = resolved_path.parent
@@ -702,7 +715,12 @@ class Task(BaseModel):
else:
file.write(str(result))
except (OSError, IOError) as e:
raise RuntimeError(f"Failed to save output file: {e}")
raise RuntimeError(
"\n".join([
f"Failed to save output file: {e}",
FILEWRITER_RECOMMENDATION
])
)
return None
def __repr__(self):

View File

@@ -7,11 +7,11 @@ from crewai.utilities import I18N
i18n = I18N()
class AddImageToolSchema(BaseModel):
image_url: str = Field(..., description="The URL or path of the image to add")
action: Optional[str] = Field(
default=None,
description="Optional context or question about the image"
default=None, description="Optional context or question about the image"
)
@@ -36,10 +36,7 @@ class AddImageTool(BaseTool):
"image_url": {
"url": image_url,
},
}
},
]
return {
"role": "user",
"content": content
}
return {"role": "user", "content": content}

View File

@@ -15,7 +15,7 @@
"final_answer_format": "If you don't need to use any more tools, you must give your best complete final answer, make sure it satisfies the expected criteria, use the EXACT format below:\n\n```\nThought: I now can give a great answer\nFinal Answer: my best complete final answer to the task.\n\n```",
"format_without_tools": "\nSorry, I didn't use the right format. I MUST either use a tool (among the available ones), OR give my best final answer.\nHere is the expected format I must follow:\n\n```\nQuestion: the input question you must answer\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n```\n This Thought/Action/Action Input/Result process can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```",
"task_with_context": "{task}\n\nThis is the context you're working with:\n{context}",
"expected_output": "\nThis is the expect criteria for your final answer: {expected_output}\nyou MUST return the actual complete content as the final answer, not a summary.",
"expected_output": "\nThis is the expected criteria for your final answer: {expected_output}\nyou MUST return the actual complete content as the final answer, not a summary.",
"human_feedback": "You got human feedback on your work, re-evaluate it and give a new Final Answer when ready.\n {human_feedback}",
"getting_input": "This is the agent's final answer: {final_answer}\n\n",
"summarizer_system_message": "You are a helpful assistant that summarizes text.",

View File

@@ -1,5 +1,5 @@
import os
from typing import Any, Dict, cast
from typing import Any, Dict, Optional, cast
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
@@ -18,11 +18,12 @@ class EmbeddingConfigurator:
"bedrock": self._configure_bedrock,
"huggingface": self._configure_huggingface,
"watson": self._configure_watson,
"custom": self._configure_custom,
}
def configure_embedder(
self,
embedder_config: Dict[str, Any] | None = None,
embedder_config: Optional[Dict[str, Any]] = None,
) -> EmbeddingFunction:
"""Configures and returns an embedding function based on the provided config."""
if embedder_config is None:
@@ -30,20 +31,19 @@ class EmbeddingConfigurator:
provider = embedder_config.get("provider")
config = embedder_config.get("config", {})
model_name = config.get("model")
if isinstance(provider, EmbeddingFunction):
try:
validate_embedding_function(provider)
return provider
except Exception as e:
raise ValueError(f"Invalid custom embedding function: {str(e)}")
model_name = config.get("model") if provider != "custom" else None
if provider not in self.embedding_functions:
raise Exception(
f"Unsupported embedding provider: {provider}, supported providers: {list(self.embedding_functions.keys())}"
)
return self.embedding_functions[provider](config, model_name)
embedding_function = self.embedding_functions[provider]
return (
embedding_function(config)
if provider == "custom"
else embedding_function(config, model_name)
)
@staticmethod
def _create_default_embedding_function():
@@ -64,6 +64,13 @@ class EmbeddingConfigurator:
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
api_base=config.get("api_base", None),
api_type=config.get("api_type", None),
api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
@staticmethod
@@ -78,6 +85,10 @@ class EmbeddingConfigurator:
api_type=config.get("api_type", "azure"),
api_version=config.get("api_version"),
model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
@staticmethod
@@ -100,6 +111,8 @@ class EmbeddingConfigurator:
return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
@staticmethod
@@ -111,6 +124,7 @@ class EmbeddingConfigurator:
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
@staticmethod
@@ -195,3 +209,28 @@ class EmbeddingConfigurator:
raise e
return WatsonEmbeddingFunction()
@staticmethod
def _configure_custom(config):
custom_embedder = config.get("embedder")
if isinstance(custom_embedder, EmbeddingFunction):
try:
validate_embedding_function(custom_embedder)
return custom_embedder
except Exception as e:
raise ValueError(f"Invalid custom embedding function: {str(e)}")
elif callable(custom_embedder):
try:
instance = custom_embedder()
if isinstance(instance, EmbeddingFunction):
validate_embedding_function(instance)
return instance
raise ValueError(
"Custom embedder does not create an EmbeddingFunction instance"
)
except Exception as e:
raise ValueError(f"Error instantiating custom embedder: {str(e)}")
else:
raise ValueError(
"Custom embedder must be an instance of `EmbeddingFunction` or a callable that creates one"
)

View File

@@ -1,30 +1,64 @@
import json
import os
import pickle
from datetime import datetime
from typing import Union
class FileHandler:
"""take care of file operations, currently it only logs messages to a file"""
"""Handler for file operations supporting both JSON and text-based logging.
Args:
file_path (Union[bool, str]): Path to the log file or boolean flag
"""
def __init__(self, file_path):
if isinstance(file_path, bool):
def __init__(self, file_path: Union[bool, str]):
self._initialize_path(file_path)
def _initialize_path(self, file_path: Union[bool, str]):
if file_path is True: # File path is boolean True
self._path = os.path.join(os.curdir, "logs.txt")
elif isinstance(file_path, str):
self._path = file_path
elif isinstance(file_path, str): # File path is a string
if file_path.endswith((".json", ".txt")):
self._path = file_path # No modification if the file ends with .json or .txt
else:
self._path = file_path + ".txt" # Append .txt if the file doesn't end with .json or .txt
else:
raise ValueError("file_path must be either a boolean or a string.")
raise ValueError("file_path must be a string or boolean.") # Handle the case where file_path isn't valid
def log(self, **kwargs):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = (
f"{now}: "
+ ", ".join([f'{key}="{value}"' for key, value in kwargs.items()])
+ "\n"
)
with open(self._path, "a", encoding="utf-8") as file:
file.write(message + "\n")
try:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = {"timestamp": now, **kwargs}
if self._path.endswith(".json"):
# Append log in JSON format
with open(self._path, "a", encoding="utf-8") as file:
# If the file is empty, start with a list; else, append to it
try:
# Try reading existing content to avoid overwriting
with open(self._path, "r", encoding="utf-8") as read_file:
existing_data = json.load(read_file)
existing_data.append(log_entry)
except (json.JSONDecodeError, FileNotFoundError):
# If no valid JSON or file doesn't exist, start with an empty list
existing_data = [log_entry]
with open(self._path, "w", encoding="utf-8") as write_file:
json.dump(existing_data, write_file, indent=4)
write_file.write("\n")
else:
# Append log in plain text format
message = f"{now}: " + ", ".join([f"{key}=\"{value}\"" for key, value in kwargs.items()]) + "\n"
with open(self._path, "a", encoding="utf-8") as file:
file.write(message)
except Exception as e:
raise ValueError(f"Failed to log message: {str(e)}")
class PickleHandler:
def __init__(self, file_name: str) -> None:
"""

View File

@@ -1183,7 +1183,7 @@ def test_agent_max_retry_limit():
[
mock.call(
{
"input": "Say the word: Hi\n\nThis is the expect criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.",
"input": "Say the word: Hi\n\nThis is the expected criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.",
"tool_names": "",
"tools": "",
"ask_for_human_input": True,
@@ -1191,7 +1191,7 @@ def test_agent_max_retry_limit():
),
mock.call(
{
"input": "Say the word: Hi\n\nThis is the expect criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.",
"input": "Say the word: Hi\n\nThis is the expected criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.",
"tool_names": "",
"tools": "",
"ask_for_human_input": True,

View File

@@ -55,72 +55,83 @@ def test_train_invalid_string_iterations(train_crew, runner):
)
@mock.patch("crewai.cli.reset_memories_command.ShortTermMemory")
@mock.patch("crewai.cli.reset_memories_command.EntityMemory")
@mock.patch("crewai.cli.reset_memories_command.LongTermMemory")
@mock.patch("crewai.cli.reset_memories_command.TaskOutputStorageHandler")
def test_reset_all_memories(
MockTaskOutputStorageHandler,
MockLongTermMemory,
MockEntityMemory,
MockShortTermMemory,
runner,
):
result = runner.invoke(reset_memories, ["--all"])
MockShortTermMemory().reset.assert_called_once()
MockEntityMemory().reset.assert_called_once()
MockLongTermMemory().reset.assert_called_once()
MockTaskOutputStorageHandler().reset.assert_called_once()
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_all_memories(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-a"])
mock_crew.reset_memories.assert_called_once_with(command_type="all")
assert result.output == "All memories have been reset.\n"
@mock.patch("crewai.cli.reset_memories_command.ShortTermMemory")
def test_reset_short_term_memories(MockShortTermMemory, runner):
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_short_term_memories(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-s"])
MockShortTermMemory().reset.assert_called_once()
mock_crew.reset_memories.assert_called_once_with(command_type="short")
assert result.output == "Short term memory has been reset.\n"
@mock.patch("crewai.cli.reset_memories_command.EntityMemory")
def test_reset_entity_memories(MockEntityMemory, runner):
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_entity_memories(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-e"])
MockEntityMemory().reset.assert_called_once()
mock_crew.reset_memories.assert_called_once_with(command_type="entity")
assert result.output == "Entity memory has been reset.\n"
@mock.patch("crewai.cli.reset_memories_command.LongTermMemory")
def test_reset_long_term_memories(MockLongTermMemory, runner):
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_long_term_memories(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-l"])
MockLongTermMemory().reset.assert_called_once()
mock_crew.reset_memories.assert_called_once_with(command_type="long")
assert result.output == "Long term memory has been reset.\n"
@mock.patch("crewai.cli.reset_memories_command.TaskOutputStorageHandler")
def test_reset_kickoff_outputs(MockTaskOutputStorageHandler, runner):
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_kickoff_outputs(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-k"])
MockTaskOutputStorageHandler().reset.assert_called_once()
mock_crew.reset_memories.assert_called_once_with(command_type="kickoff_outputs")
assert result.output == "Latest Kickoff outputs stored has been reset.\n"
@mock.patch("crewai.cli.reset_memories_command.ShortTermMemory")
@mock.patch("crewai.cli.reset_memories_command.LongTermMemory")
def test_reset_multiple_memory_flags(MockShortTermMemory, MockLongTermMemory, runner):
result = runner.invoke(
reset_memories,
[
"-s",
"-l",
],
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_multiple_memory_flags(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-s", "-l"])
# Check that reset_memories was called twice with the correct arguments
assert mock_crew.reset_memories.call_count == 2
mock_crew.reset_memories.assert_has_calls(
[mock.call(command_type="long"), mock.call(command_type="short")]
)
MockShortTermMemory().reset.assert_called_once()
MockLongTermMemory().reset.assert_called_once()
assert (
result.output
== "Long term memory has been reset.\nShort term memory has been reset.\n"
)
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_knowledge(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["--knowledge"])
mock_crew.reset_memories.assert_called_once_with(command_type="knowledge")
assert result.output == "Knowledge has been reset.\n"
def test_reset_no_memory_flags(runner):
result = runner.invoke(
reset_memories,

View File

@@ -6,6 +6,7 @@ from concurrent.futures import Future
from unittest import mock
from unittest.mock import MagicMock, patch
import agentops
import instructor
import pydantic_core
import pytest
@@ -15,6 +16,7 @@ from crewai.agents.cache import CacheHandler
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.process import Process
from crewai.project import crew
@@ -27,6 +29,100 @@ from crewai.utilities import Logger
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
@pytest.fixture
def researcher():
"""Fixture to create a researcher agent."""
return Agent(
role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups.",
allow_delegation=False,
)
@pytest.fixture
def mock_agentops():
"""Fixture to mock agentops for testing."""
mock_agentops = MagicMock()
mock_agentops.init = MagicMock()
return mock_agentops
@pytest.mark.agentops
class TestAgentOpsIntegration:
"""Tests for AgentOps integration."""
def test_initialization_with_api_key(self, mock_agentops, monkeypatch):
"""Test that agentops is properly initialized when API key is present."""
monkeypatch.setattr("crewai.crew.agentops", mock_agentops)
monkeypatch.setenv("AGENTOPS_API_KEY", "test-key-12345")
crew = Crew(agents=[researcher], tasks=[Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)])
crew.set_private_attrs()
mock_agentops.init.assert_called_once_with("test-key-12345")
def test_initialization_without_api_key(self, mock_agentops):
"""Test that agentops is not initialized when API key is not present."""
crew = Crew(agents=[researcher], tasks=[Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)])
mock_agentops.assert_not_called()
def test_initialization_with_invalid_api_key(self, mock_agentops, monkeypatch):
"""Test that agentops is not initialized when API key is invalid."""
monkeypatch.setenv("AGENTOPS_API_KEY", " ")
crew = Crew(agents=[researcher], tasks=[Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)])
mock_agentops.assert_not_called()
def test_gemini_llm_integration(self, mock_agentops, monkeypatch):
"""Test that Gemini LLM works correctly with agentops."""
# Mock agentops
monkeypatch.setattr("crewai.crew.agentops", mock_agentops)
# Set API keys
monkeypatch.setenv("AGENTOPS_API_KEY", "test-key-12345")
monkeypatch.setenv("GOOGLE_API_KEY", "test-key")
# Create crew with Gemini LLM
llm = LLM(model="gemini-pro")
agent = Agent(
role="test",
goal="test",
backstory="test",
llm=llm
)
task = Task(
description="test task",
expected_output="test output",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task])
crew.set_private_attrs()
# Mock the agent execution to avoid actual API calls
with patch.object(Task, 'execute_sync', return_value=TaskOutput(
description="test",
raw="test output",
agent=agent.role
)):
# Run crew
crew.kickoff()
# Verify agentops.end_session was called correctly
mock_agentops.end_session.assert_called_once_with(
end_state="Success",
end_state_reason="Finished Execution",
is_auto_end=True
)
ceo = Agent(
role="CEO",
goal="Make sure the writers in your company produce amazing content.",
@@ -49,6 +145,41 @@ writer = Agent(
)
def test_crew_with_only_conditional_tasks_raises_error():
"""Test that creating a crew with only conditional tasks raises an error."""
def condition_func(task_output: TaskOutput) -> bool:
return True
conditional1 = ConditionalTask(
description="Conditional task 1",
expected_output="Output 1",
agent=researcher,
condition=condition_func,
)
conditional2 = ConditionalTask(
description="Conditional task 2",
expected_output="Output 2",
agent=researcher,
condition=condition_func,
)
conditional3 = ConditionalTask(
description="Conditional task 3",
expected_output="Output 3",
agent=researcher,
condition=condition_func,
)
with pytest.raises(
pydantic_core._pydantic_core.ValidationError,
match="Crew must include at least one non-conditional task",
):
Crew(
agents=[researcher],
tasks=[conditional1, conditional2, conditional3],
)
def test_crew_config_conditional_requirement():
with pytest.raises(ValueError):
Crew(process=Process.sequential)
@@ -556,12 +687,12 @@ def test_crew_with_delegating_agents_should_not_override_task_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(isinstance(tool, TestTool) for tool in tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
assert any(
isinstance(tool, TestTool) for tool in tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -620,12 +751,12 @@ def test_crew_with_delegating_agents_should_not_override_agent_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(isinstance(tool, TestTool) for tool in new_ceo.tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
assert any(
isinstance(tool, TestTool) for tool in new_ceo.tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -749,17 +880,17 @@ def test_task_tools_override_agent_tools_with_allow_delegation():
used_tools = kwargs["tools"]
# Confirm AnotherTestTool is present but TestTool is not
assert any(isinstance(tool, AnotherTestTool) for tool in used_tools), (
"AnotherTestTool should be present"
)
assert not any(isinstance(tool, TestTool) for tool in used_tools), (
"TestTool should not be present among used tools"
)
assert any(
isinstance(tool, AnotherTestTool) for tool in used_tools
), "AnotherTestTool should be present"
assert not any(
isinstance(tool, TestTool) for tool in used_tools
), "TestTool should not be present among used tools"
# Confirm delegation tool(s) are present
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
# Finally, make sure the agent's original tools remain unchanged
assert len(researcher_with_delegation.tools) == 1
@@ -1560,9 +1691,9 @@ def test_code_execution_flag_adds_code_tool_upon_kickoff():
# Verify that exactly one tool was used and it was a CodeInterpreterTool
assert len(used_tools) == 1, "Should have exactly one tool"
assert isinstance(used_tools[0], CodeInterpreterTool), (
"Tool should be CodeInterpreterTool"
)
assert isinstance(
used_tools[0], CodeInterpreterTool
), "Tool should be CodeInterpreterTool"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -1919,6 +2050,7 @@ def test_task_callback_on_crew():
def test_task_callback_both_on_task_and_crew():
from unittest.mock import MagicMock, patch
mock_callback_on_task = MagicMock()
mock_callback_on_crew = MagicMock()
@@ -2060,6 +2192,210 @@ def test_tools_with_custom_caching():
assert result.raw == "3"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_task_uses_last_output():
"""Test that conditional tasks use the last task output for condition evaluation."""
task1 = Task(
description="First task",
expected_output="First output",
agent=researcher,
)
def condition_fails(task_output: TaskOutput) -> bool:
# This condition will never be met
return "never matches" in task_output.raw.lower()
def condition_succeeds(task_output: TaskOutput) -> bool:
# This condition will match first task's output
return "first success" in task_output.raw.lower()
conditional_task1 = ConditionalTask(
description="Second task - conditional that fails condition",
expected_output="Second output",
agent=researcher,
condition=condition_fails,
)
conditional_task2 = ConditionalTask(
description="Third task - conditional that succeeds using first task output",
expected_output="Third output",
agent=writer,
condition=condition_succeeds,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, conditional_task1, conditional_task2],
)
# Mock outputs for tasks
mock_first = TaskOutput(
description="First task output",
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_third]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify outputs collection:
# First executed task output, followed by an automatically generated (skipped) output, then the conditional execution
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "First success output"
) # First task succeeded
assert (
result.tasks_output[1].raw == ""
) # Second task skipped (condition failed)
assert (
result.tasks_output[2].raw == "Third task executed"
) # Third task used first task's output
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_tasks_result_collection():
"""Test that task outputs are properly collected based on execution status."""
task1 = Task(
description="Normal task that always executes",
expected_output="First output",
agent=researcher,
)
def condition_never_met(task_output: TaskOutput) -> bool:
return "never matches" in task_output.raw.lower()
def condition_always_met(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
task2 = ConditionalTask(
description="Conditional task that never executes",
expected_output="Second output",
agent=researcher,
condition=condition_never_met,
)
task3 = ConditionalTask(
description="Conditional task that always executes",
expected_output="Third output",
agent=writer,
condition=condition_always_met,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock outputs for different execution paths
mock_success = TaskOutput(
description="Success output",
raw="Success output", # Triggers third task's condition
agent=researcher.role,
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_success, mock_conditional]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify task output collection:
# There should be three outputs: normal task, skipped conditional task (empty output),
# and the conditional task that executed.
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
# Verify task output collection
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():
"""Test that having multiple conditional tasks in sequence works correctly."""
task1 = Task(
description="Initial research task",
expected_output="Research output",
agent=researcher,
)
def condition1(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
def condition2(task_output: TaskOutput) -> bool:
return "proceed" in task_output.raw.lower()
task2 = ConditionalTask(
description="First conditional task",
expected_output="Conditional output 1",
agent=writer,
condition=condition1,
)
task3 = ConditionalTask(
description="Second conditional task",
expected_output="Conditional output 2",
agent=writer,
condition=condition2,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock different task outputs to test conditional logic
mock_success = TaskOutput(
description="Mock success",
raw="Success and proceed output",
agent=researcher.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
# Verify all tasks were executed (no IndexError)
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
@@ -3178,9 +3514,9 @@ def test_fetch_inputs():
expected_placeholders = {"role_detail", "topic", "field"}
actual_placeholders = crew.fetch_inputs()
assert actual_placeholders == expected_placeholders, (
f"Expected {expected_placeholders}, but got {actual_placeholders}"
)
assert (
actual_placeholders == expected_placeholders
), f"Expected {expected_placeholders}, but got {actual_placeholders}"
def test_task_tools_preserve_code_execution_tools():
@@ -3253,20 +3589,20 @@ def test_task_tools_preserve_code_execution_tools():
used_tools = kwargs["tools"]
# Verify all expected tools are present
assert any(isinstance(tool, TestTool) for tool in used_tools), (
"Task's TestTool should be present"
)
assert any(isinstance(tool, CodeInterpreterTool) for tool in used_tools), (
"CodeInterpreterTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
assert any(
isinstance(tool, TestTool) for tool in used_tools
), "Task's TestTool should be present"
assert any(
isinstance(tool, CodeInterpreterTool) for tool in used_tools
), "CodeInterpreterTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
# Verify the total number of tools (TestTool + CodeInterpreter + 2 delegation tools)
assert len(used_tools) == 4, (
"Should have TestTool, CodeInterpreter, and 2 delegation tools"
)
assert (
len(used_tools) == 4
), "Should have TestTool, CodeInterpreter, and 2 delegation tools"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -3310,9 +3646,9 @@ def test_multimodal_flag_adds_multimodal_tools():
used_tools = kwargs["tools"]
# Check that the multimodal tool was added
assert any(isinstance(tool, AddImageTool) for tool in used_tools), (
"AddImageTool should be present when agent is multimodal"
)
assert any(
isinstance(tool, AddImageTool) for tool in used_tools
), "AddImageTool should be present when agent is multimodal"
# Verify we have exactly one tool (just the AddImageTool)
assert len(used_tools) == 1, "Should only have the AddImageTool"
@@ -3538,9 +3874,9 @@ def test_crew_guardrail_feedback_in_context():
assert len(execution_contexts) > 1, "Task should have been executed multiple times"
# Verify that the second execution included the guardrail feedback
assert "Output must contain the keyword 'IMPORTANT'" in execution_contexts[1], (
"Guardrail feedback should be included in retry context"
)
assert (
"Output must contain the keyword 'IMPORTANT'" in execution_contexts[1]
), "Guardrail feedback should be included in retry context"
# Verify final output meets guardrail requirements
assert "IMPORTANT" in result.raw, "Final output should contain required keyword"

View File

@@ -286,6 +286,79 @@ def test_o3_mini_reasoning_effort_medium():
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.fixture
def anthropic_llm():
"""Fixture providing an Anthropic LLM instance."""
return LLM(model="anthropic/claude-3-sonnet")
@pytest.fixture
def system_message():
"""Fixture providing a system message."""
return {"role": "system", "content": "test"}
@pytest.fixture
def user_message():
"""Fixture providing a user message."""
return {"role": "user", "content": "test"}
def test_anthropic_message_formatting_edge_cases(anthropic_llm):
"""Test edge cases for Anthropic message formatting."""
# Test None messages
with pytest.raises(TypeError, match="Messages cannot be None"):
anthropic_llm._format_messages_for_provider(None)
# Test empty message list
formatted = anthropic_llm._format_messages_for_provider([])
assert len(formatted) == 1
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
# Test invalid message format
with pytest.raises(TypeError, match="Invalid message format"):
anthropic_llm._format_messages_for_provider([{"invalid": "message"}])
def test_anthropic_model_detection():
"""Test Anthropic model detection with various formats."""
models = [
("anthropic/claude-3", True),
("claude-instant", True),
("claude/v1", True),
("gpt-4", False),
("", False),
("anthropomorphic", False), # Should not match partial words
]
for model, expected in models:
llm = LLM(model=model)
assert llm.is_anthropic == expected, f"Failed for model: {model}"
def test_anthropic_message_formatting(anthropic_llm, system_message, user_message):
"""Test Anthropic message formatting with fixtures."""
# Test when first message is system
formatted = anthropic_llm._format_messages_for_provider([system_message])
assert len(formatted) == 2
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
assert formatted[1] == system_message
# Test when first message is already user
formatted = anthropic_llm._format_messages_for_provider([user_message])
assert len(formatted) == 1
assert formatted[0] == user_message
# Test with empty message list
formatted = anthropic_llm._format_messages_for_provider([])
assert len(formatted) == 1
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
# Test with non-Anthropic model (should not modify messages)
non_anthropic_llm = LLM(model="gpt-4")
formatted = non_anthropic_llm._format_messages_for_provider([system_message])
assert len(formatted) == 1
assert formatted[0] == system_message
def test_deepseek_r1_with_open_router():
if not os.getenv("OPEN_ROUTER_API_KEY"):
pytest.skip("OPEN_ROUTER_API_KEY not set; skipping test.")