Compare commits

..

4 Commits

Author SHA1 Message Date
Lorenze Jay
2d44356c81 Refine custom embedder configuration support
- Update custom embedder configuration method to handle custom embedding functions
- Modify type hints for embedder configuration
- Remove unused model_name parameter in custom embedder configuration
2025-02-07 13:41:36 -08:00
Lorenze Jay
d48211f7f8 added docs 2025-02-07 12:48:17 -08:00
Lorenze Jay
8eea0bd502 Merge branch 'main' of github.com:crewAIInc/crewAI into fix/embedder-config 2025-02-07 12:45:30 -08:00
Lorenze Jay
cafac13447 Enhance embedding configuration with custom embedder support
- Add support for custom embedding functions in EmbeddingConfigurator
- Update type hints for embedder configuration
- Extend configuration options for various embedding providers
- Add optional embedder configuration to Memory class
2025-02-07 12:41:57 -08:00
16 changed files with 152 additions and 793 deletions

View File

@@ -1,18 +1,10 @@
<div align="center">
![Logo of CrewAI](./docs/crewai_logo.png)
![Logo of CrewAI, two people rowing on a boat](./docs/crewai_logo.png)
# **CrewAI**
**CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
**CrewAI Enterprise**
Want to plan, build (+ no code), deploy, monitor and interare your agents: [CrewAI Enterprise](https://www.crewai.com/enterprise). Designed for complex, real-world applications, our enterprise solution offers:
- **Seamless Integrations**
- **Scalable & Secure Deployment**
- **Actionable Insights**
- **24/7 Support**
🤖 **CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
<h3>
@@ -400,7 +392,7 @@ class AdvancedAnalysisFlow(Flow[MarketState]):
goal="Gather and validate supporting market data",
backstory="You excel at finding and correlating multiple data sources"
)
analysis_task = Task(
description="Analyze {sector} sector data for the past {timeframe}",
expected_output="Detailed market analysis with confidence score",
@@ -411,7 +403,7 @@ class AdvancedAnalysisFlow(Flow[MarketState]):
expected_output="Corroborating evidence and potential contradictions",
agent=researcher
)
# Demonstrate crew autonomy
analysis_crew = Crew(
agents=[analyst, researcher],

View File

@@ -91,7 +91,7 @@ result = crew.kickoff(inputs={"question": "What city does John live in and how o
```
Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including MD, PDF, DOCX, HTML, and more.
Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including TXT, PDF, DOCX, HTML, and more.
<Note>
You need to install `docling` for the following example to work: `uv add docling`
@@ -152,10 +152,10 @@ Here are examples of how to use different types of knowledge sources:
### Text File Knowledge Source
```python
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
# Create a text file knowledge source
text_source = TextFileKnowledgeSource(
text_source = CrewDoclingSource(
file_paths=["document.txt", "another.txt"]
)

View File

@@ -463,32 +463,26 @@ Learn how to get the most out of your LLM configuration:
<Accordion title="Google">
```python Code
# Option 1: Gemini accessed with an API key.
# Option 1. Gemini accessed with an API key.
# https://ai.google.dev/gemini-api/docs/api-key
GEMINI_API_KEY=<your-api-key>
# Option 2: Vertex AI IAM credentials for Gemini, Anthropic, and Model Garden.
# Option 2. Vertex AI IAM credentials for Gemini, Anthropic, and anything in the Model Garden.
# https://cloud.google.com/vertex-ai/generative-ai/docs/overview
```
Get credentials:
```python Code
import json
## GET CREDENTIALS
file_path = 'path/to/vertex_ai_service_account.json'
# Load the JSON file
with open(file_path, 'r') as file:
vertex_credentials = json.load(file)
# Convert the credentials to a JSON string
# Convert to JSON string
vertex_credentials_json = json.dumps(vertex_credentials)
```
Example usage:
```python Code
from crewai import LLM
llm = LLM(
model="gemini/gemini-1.5-pro-latest",
temperature=0.7,

View File

@@ -58,107 +58,41 @@ my_crew = Crew(
### Example: Use Custom Memory Instances e.g FAISS as the VectorDB
```python Code
from crewai import Crew, Process
from crewai.memory import LongTermMemory, ShortTermMemory, EntityMemory
from crewai.memory.storage import LTMSQLiteStorage, RAGStorage
from typing import List, Optional
from crewai import Crew, Agent, Task, Process
# Assemble your crew with memory capabilities
my_crew: Crew = Crew(
agents = [...],
tasks = [...],
process = Process.sequential,
memory = True,
# Long-term memory for persistent storage across sessions
long_term_memory = LongTermMemory(
my_crew = Crew(
agents=[...],
tasks=[...],
process="Process.sequential",
memory=True,
long_term_memory=EnhanceLongTermMemory(
storage=LTMSQLiteStorage(
db_path="/my_crew1/long_term_memory_storage.db"
db_path="/my_data_dir/my_crew1/long_term_memory_storage.db"
)
),
# Short-term memory for current context using RAG
short_term_memory = ShortTermMemory(
storage = RAGStorage(
embedder_config={
"provider": "openai",
"config": {
"model": 'text-embedding-3-small'
}
},
type="short_term",
path="/my_crew1/"
)
short_term_memory=EnhanceShortTermMemory(
storage=CustomRAGStorage(
crew_name="my_crew",
storage_type="short_term",
data_dir="//my_data_dir",
model=embedder["model"],
dimension=embedder["dimension"],
),
),
# Entity memory for tracking key information about entities
entity_memory = EntityMemory(
storage=RAGStorage(
embedder_config={
"provider": "openai",
"config": {
"model": 'text-embedding-3-small'
}
},
type="short_term",
path="/my_crew1/"
)
entity_memory=EnhanceEntityMemory(
storage=CustomRAGStorage(
crew_name="my_crew",
storage_type="entities",
data_dir="//my_data_dir",
model=embedder["model"],
dimension=embedder["dimension"],
),
),
verbose=True,
)
```
## Security Considerations
When configuring memory storage:
- Use environment variables for storage paths (e.g., `CREWAI_STORAGE_DIR`)
- Never hardcode sensitive information like database credentials
- Consider access permissions for storage directories
- Use relative paths when possible to maintain portability
Example using environment variables:
```python
import os
from crewai import Crew
from crewai.memory import LongTermMemory
from crewai.memory.storage import LTMSQLiteStorage
# Configure storage path using environment variable
storage_path = os.getenv("CREWAI_STORAGE_DIR", "./storage")
crew = Crew(
memory=True,
long_term_memory=LongTermMemory(
storage=LTMSQLiteStorage(
db_path="{storage_path}/memory.db".format(storage_path=storage_path)
)
)
)
```
## Configuration Examples
### Basic Memory Configuration
```python
from crewai import Crew
from crewai.memory import LongTermMemory
# Simple memory configuration
crew = Crew(memory=True) # Uses default storage locations
```
### Custom Storage Configuration
```python
from crewai import Crew
from crewai.memory import LongTermMemory
from crewai.memory.storage import LTMSQLiteStorage
# Configure custom storage paths
crew = Crew(
memory=True,
long_term_memory=LongTermMemory(
storage=LTMSQLiteStorage(db_path="./memory.db")
)
)
```
## Integrating Mem0 for Enhanced User Memory
[Mem0](https://mem0.ai/) is a self-improving memory layer for LLM applications, enabling personalized AI experiences.
@@ -282,19 +216,6 @@ my_crew = Crew(
### Using Google AI embeddings
#### Prerequisites
Before using Google AI embeddings, ensure you have:
- Access to the Gemini API
- The necessary API keys and permissions
You will need to update your *pyproject.toml* dependencies:
```YAML
dependencies = [
"google-generativeai>=0.8.4", #main version in January/2025 - crewai v.0.100.0 and crewai-tools 0.33.0
"crewai[tools]>=0.100.0,<1.0.0"
]
```
```python Code
from crewai import Crew, Agent, Task, Process
@@ -447,38 +368,6 @@ my_crew = Crew(
)
```
### Using Amazon Bedrock embeddings
```python Code
# Note: Ensure you have installed `boto3` for Bedrock embeddings to work.
import os
import boto3
from crewai import Crew, Agent, Task, Process
boto3_session = boto3.Session(
region_name=os.environ.get("AWS_REGION_NAME"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY")
)
my_crew = Crew(
agents=[...],
tasks=[...],
process=Process.sequential,
memory=True,
embedder={
"provider": "bedrock",
"config":{
"session": boto3_session,
"model": "amazon.titan-embed-text-v2:0",
"vector_dimension": 1024
}
}
verbose=True
)
```
### Adding Custom Embedding Function
```python Code

View File

@@ -268,7 +268,7 @@ analysis_task = Task(
Task guardrails provide a way to validate and transform task outputs before they
are passed to the next task. This feature helps ensure data quality and provides
feedback to agents when their output doesn't meet specific criteria.
efeedback to agents when their output doesn't meet specific criteria.
### Using Task Guardrails

View File

@@ -8,9 +8,9 @@ icon: file-pen
## Description
The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files with cross-platform compatibility (Windows, Linux, macOS).
The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files.
It is particularly useful in scenarios such as generating reports, saving logs, creating configuration files, and more.
This tool handles path differences across operating systems, supports UTF-8 encoding, and automatically creates directories if they don't exist, making it easier to organize your output reliably across different platforms.
This tool supports creating new directories if they don't exist, making it easier to organize your output.
## Installation
@@ -43,8 +43,6 @@ print(result)
## Conclusion
By integrating the `FileWriterTool` into your crews, the agents can reliably write content to files across different operating systems.
This tool is essential for tasks that require saving output data, creating structured file systems, and handling cross-platform file operations.
It's particularly recommended for Windows users who may encounter file writing issues with standard Python file operations.
By adhering to the setup and usage guidelines provided, incorporating this tool into projects is straightforward and ensures consistent file writing behavior across all platforms.
By integrating the `FileWriterTool` into your crews, the agents can execute the process of writing content to files and creating directories.
This tool is essential for tasks that require saving output data, creating structured file systems, and more. By adhering to the setup and usage guidelines provided,
incorporating this tool into projects is straightforward and efficient.

View File

@@ -16,6 +16,7 @@ from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.task import Task
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import Tool
from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
@@ -145,7 +146,7 @@ class Agent(BaseAgent):
def _set_knowledge(self):
try:
if self.knowledge_sources:
full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)")
full_pattern = re.compile(r'[^a-zA-Z0-9\-_\r\n]|(\.\.)')
knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}"
if isinstance(self.knowledge_sources, list) and all(
isinstance(k, BaseKnowledgeSource) for k in self.knowledge_sources

View File

@@ -3,6 +3,11 @@ import subprocess
import click
from crewai.cli.utils import get_crew
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
def reset_memories_command(

View File

@@ -1,6 +1,7 @@
import asyncio
import json
import re
import sys
import uuid
import warnings
from concurrent.futures import Future
@@ -380,22 +381,6 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def validate_must_have_non_conditional_task(self) -> "Crew":
"""Ensure that a crew has at least one non-conditional task."""
if not self.tasks:
return self
non_conditional_count = sum(
1 for task in self.tasks if not isinstance(task, ConditionalTask)
)
if non_conditional_count == 0:
raise PydanticCustomError(
"only_conditional_tasks",
"Crew must include at least one non-conditional task",
{},
)
return self
@model_validator(mode="after")
def validate_first_task(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
@@ -456,7 +441,6 @@ class Crew(BaseModel):
return self
@property
def key(self) -> str:
source = [agent.key for agent in self.agents] + [
@@ -759,7 +743,6 @@ class Crew(BaseModel):
task, task_outputs, futures, task_index, was_replayed
)
if skipped_task_output:
task_outputs.append(skipped_task_output)
continue
if task.async_execution:
@@ -783,7 +766,7 @@ class Crew(BaseModel):
context=context,
tools=tools_for_task,
)
task_outputs.append(task_output)
task_outputs = [task_output]
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
@@ -804,7 +787,7 @@ class Crew(BaseModel):
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
previous_output = task_outputs[-1] if task_outputs else None
previous_output = task_outputs[task_index - 1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",
@@ -926,15 +909,11 @@ class Crew(BaseModel):
)
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
if not task_outputs:
raise ValueError("No task outputs available to create crew output.")
# Filter out empty outputs and get the last valid one as the main output
valid_outputs = [t for t in task_outputs if t.raw]
if not valid_outputs:
raise ValueError("No valid task outputs available to create crew output.")
final_task_output = valid_outputs[-1]
if len(task_outputs) != 1:
raise ValueError(
"Something went wrong. Kickoff should return only one task output."
)
final_task_output = task_outputs[0]
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
@@ -943,7 +922,7 @@ class Crew(BaseModel):
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=task_outputs,
tasks_output=[task.output for task in self.tasks if task.output],
token_usage=token_usage,
)

View File

@@ -1,138 +1,28 @@
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union
from urllib.parse import urlparse
from typing import Dict, List
from pydantic import Field, field_validator
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
class ExcelKnowledgeSource(BaseKnowledgeSource):
class ExcelKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries Excel file content using embeddings."""
# override content to be a dict of file paths to sheet names to csv content
_logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default=None,
description="[Deprecated] The path to the file. Use file_paths instead.",
)
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default_factory=list, description="The path to the file"
)
chunks: List[str] = Field(default_factory=list)
content: Dict[Path, Dict[str, str]] = Field(default_factory=dict)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
"""Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions
if (
v is None
and info.data.get(
"file_path" if info.field_name == "file_paths" else "file_paths"
)
is None
):
raise ValueError("Either file_path or file_paths must be provided")
return v
def _process_file_paths(self) -> List[Path]:
"""Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
if self.file_paths is None:
raise ValueError("Your source must be provided with a file_paths: []")
# Convert single path to list
path_list: List[Union[Path, str]] = (
[self.file_paths]
if isinstance(self.file_paths, (str, Path))
else list(self.file_paths)
if isinstance(self.file_paths, list)
else []
)
if not path_list:
raise ValueError(
"file_path/file_paths must be a Path, str, or a list of these types"
)
return [self.convert_to_path(path) for path in path_list]
def validate_content(self):
"""Validate the paths."""
for path in self.safe_file_paths:
if not path.exists():
self._logger.log(
"error",
f"File not found: {path}. Try adding sources to the knowledge directory. If it's inside the knowledge directory, use the relative path.",
color="red",
)
raise FileNotFoundError(f"File not found: {path}")
if not path.is_file():
self._logger.log(
"error",
f"Path is not a file: {path}",
color="red",
)
def model_post_init(self, _) -> None:
if self.file_path:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
self.safe_file_paths = self._process_file_paths()
self.validate_content()
self.content = self._load_content()
def _load_content(self) -> Dict[Path, Dict[str, str]]:
"""Load and preprocess Excel file content from multiple sheets.
Each sheet's content is converted to CSV format and stored.
Returns:
Dict[Path, Dict[str, str]]: A mapping of file paths to their respective sheet contents.
Raises:
ImportError: If required dependencies are missing.
FileNotFoundError: If the specified Excel file cannot be opened.
"""
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess Excel file content."""
pd = self._import_dependencies()
content_dict = {}
for file_path in self.safe_file_paths:
file_path = self.convert_to_path(file_path)
with pd.ExcelFile(file_path) as xl:
sheet_dict = {
str(sheet_name): str(
pd.read_excel(xl, sheet_name).to_csv(index=False)
)
for sheet_name in xl.sheet_names
}
content_dict[file_path] = sheet_dict
df = pd.read_excel(file_path)
content = df.to_csv(index=False)
content_dict[file_path] = content
return content_dict
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
def _import_dependencies(self):
"""Dynamically import dependencies."""
try:
import openpyxl # noqa
import pandas as pd
return pd
@@ -148,14 +38,10 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
and save the embeddings.
"""
# Convert dictionary values to a single string if content is a dictionary
# Updated to account for .xlsx workbooks with multiple tabs/sheets
content_str = ""
for value in self.content.values():
if isinstance(value, dict):
for sheet_value in value.values():
content_str += str(sheet_value) + "\n"
else:
content_str += str(value) + "\n"
if isinstance(self.content, dict):
content_str = "\n".join(str(value) for value in self.content.values())
else:
content_str = str(self.content)
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)

View File

@@ -164,7 +164,6 @@ class LLM:
self.context_window_size = 0
self.reasoning_effort = reasoning_effort
self.additional_params = kwargs
self.is_anthropic = self._is_anthropic_model(model)
litellm.drop_params = True
@@ -179,62 +178,42 @@ class LLM:
self.set_callbacks(callbacks)
self.set_env_callbacks()
def _is_anthropic_model(self, model: str) -> bool:
"""Determine if the model is from Anthropic provider.
Args:
model: The model identifier string.
Returns:
bool: True if the model is from Anthropic, False otherwise.
"""
ANTHROPIC_PREFIXES = ('anthropic/', 'claude-', 'claude/')
return any(prefix in model.lower() for prefix in ANTHROPIC_PREFIXES)
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""High-level LLM call method.
Args:
messages: Input messages for the LLM.
Can be a string or list of message dictionaries.
If string, it will be converted to a single user message.
If list, each dict must have 'role' and 'content' keys.
tools: Optional list of tool schemas for function calling.
Each tool should define its name, description, and parameters.
callbacks: Optional list of callback functions to be executed
during and after the LLM call.
available_functions: Optional dict mapping function names to callables
that can be invoked by the LLM.
) -> str:
"""
High-level llm call method that:
1) Accepts either a string or a list of messages
2) Converts string input to the required message format
3) Calls litellm.completion
4) Handles function/tool calls if any
5) Returns the final text response or tool result
Parameters:
- messages (Union[str, List[Dict[str, str]]]): The input messages for the LLM.
- If a string is provided, it will be converted into a message list with a single entry.
- If a list of dictionaries is provided, each dictionary should have 'role' and 'content' keys.
- tools (Optional[List[dict]]): A list of tool schemas for function calling.
- callbacks (Optional[List[Any]]): A list of callback functions to be executed.
- available_functions (Optional[Dict[str, Any]]): A dictionary mapping function names to actual Python functions.
Returns:
Union[str, Any]: Either a text response from the LLM (str) or
the result of a tool function call (Any).
Raises:
TypeError: If messages format is invalid
ValueError: If response format is not supported
LLMContextLengthExceededException: If input exceeds model's context limit
- str: The final text response from the LLM or the result of a tool function call.
Examples:
# Example 1: Simple string input
>>> response = llm.call("Return the name of a random city.")
>>> print(response)
"Paris"
# Example 2: Message list with system and user messages
>>> messages = [
... {"role": "system", "content": "You are a geography expert"},
... {"role": "user", "content": "What is France's capital?"}
... ]
>>> response = llm.call(messages)
>>> print(response)
"The capital of France is Paris."
---------
# Example 1: Using a string input
response = llm.call("Return the name of a random city in the world.")
print(response)
# Example 2: Using a list of messages
messages = [{"role": "user", "content": "What is the capital of France?"}]
response = llm.call(messages)
print(response)
"""
# Validate parameters before proceeding with the call.
self._validate_call_params()
@@ -254,13 +233,10 @@ class LLM:
self.set_callbacks(callbacks)
try:
# --- 1) Format messages according to provider requirements
formatted_messages = self._format_messages_for_provider(messages)
# --- 2) Prepare the parameters for the completion call
# --- 1) Prepare the parameters for the completion call
params = {
"model": self.model,
"messages": formatted_messages,
"messages": messages,
"timeout": self.timeout,
"temperature": self.temperature,
"top_p": self.top_p,
@@ -348,38 +324,6 @@ class LLM:
logging.error(f"LiteLLM call failed: {str(e)}")
raise
def _format_messages_for_provider(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Format messages according to provider requirements.
Args:
messages: List of message dictionaries with 'role' and 'content' keys.
Can be empty or None.
Returns:
List of formatted messages according to provider requirements.
For Anthropic models, ensures first message has 'user' role.
Raises:
TypeError: If messages is None or contains invalid message format.
"""
if messages is None:
raise TypeError("Messages cannot be None")
# Validate message format first
for msg in messages:
if not isinstance(msg, dict) or "role" not in msg or "content" not in msg:
raise TypeError("Invalid message format. Each message must be a dict with 'role' and 'content' keys")
if not self.is_anthropic:
return messages
# Anthropic requires messages to start with 'user' role
if not messages or messages[0]["role"] == "system":
# If first message is system or empty, add a placeholder user message
return [{"role": "user", "content": "."}, *messages]
return messages
def _get_custom_llm_provider(self) -> str:
"""
Derives the custom_llm_provider from the model string.

View File

@@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Optional
from pydantic import PrivateAttr

View File

@@ -1,7 +1,9 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
from crewai.memory.storage.rag_storage import RAGStorage
class Memory(BaseModel):
"""

View File

@@ -674,32 +674,19 @@ class Task(BaseModel):
return OutputFormat.PYDANTIC
return OutputFormat.RAW
def _save_file(self, result: Union[Dict, str, Any]) -> None:
def _save_file(self, result: Any) -> None:
"""Save task output to a file.
Note:
For cross-platform file writing, especially on Windows, consider using FileWriterTool
from the crewai_tools package:
pip install 'crewai[tools]'
from crewai_tools import FileWriterTool
Args:
result: The result to save to the file. Can be a dict or any stringifiable object.
Raises:
ValueError: If output_file is not set
RuntimeError: If there is an error writing to the file. For cross-platform
compatibility, especially on Windows, use FileWriterTool from crewai_tools
package.
RuntimeError: If there is an error writing to the file
"""
if self.output_file is None:
raise ValueError("output_file is not set.")
FILEWRITER_RECOMMENDATION = (
"For cross-platform file writing, especially on Windows, "
"use FileWriterTool from crewai_tools package."
)
try:
resolved_path = Path(self.output_file).expanduser().resolve()
directory = resolved_path.parent
@@ -715,12 +702,7 @@ class Task(BaseModel):
else:
file.write(str(result))
except (OSError, IOError) as e:
raise RuntimeError(
"\n".join([
f"Failed to save output file: {e}",
FILEWRITER_RECOMMENDATION
])
)
raise RuntimeError(f"Failed to save output file: {e}")
return None
def __repr__(self):

View File

@@ -49,41 +49,6 @@ writer = Agent(
)
def test_crew_with_only_conditional_tasks_raises_error():
"""Test that creating a crew with only conditional tasks raises an error."""
def condition_func(task_output: TaskOutput) -> bool:
return True
conditional1 = ConditionalTask(
description="Conditional task 1",
expected_output="Output 1",
agent=researcher,
condition=condition_func,
)
conditional2 = ConditionalTask(
description="Conditional task 2",
expected_output="Output 2",
agent=researcher,
condition=condition_func,
)
conditional3 = ConditionalTask(
description="Conditional task 3",
expected_output="Output 3",
agent=researcher,
condition=condition_func,
)
with pytest.raises(
pydantic_core._pydantic_core.ValidationError,
match="Crew must include at least one non-conditional task",
):
Crew(
agents=[researcher],
tasks=[conditional1, conditional2, conditional3],
)
def test_crew_config_conditional_requirement():
with pytest.raises(ValueError):
Crew(process=Process.sequential)
@@ -591,12 +556,12 @@ def test_crew_with_delegating_agents_should_not_override_task_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(
isinstance(tool, TestTool) for tool in tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
assert any(isinstance(tool, TestTool) for tool in tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -655,12 +620,12 @@ def test_crew_with_delegating_agents_should_not_override_agent_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(
isinstance(tool, TestTool) for tool in new_ceo.tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
assert any(isinstance(tool, TestTool) for tool in new_ceo.tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -784,17 +749,17 @@ def test_task_tools_override_agent_tools_with_allow_delegation():
used_tools = kwargs["tools"]
# Confirm AnotherTestTool is present but TestTool is not
assert any(
isinstance(tool, AnotherTestTool) for tool in used_tools
), "AnotherTestTool should be present"
assert not any(
isinstance(tool, TestTool) for tool in used_tools
), "TestTool should not be present among used tools"
assert any(isinstance(tool, AnotherTestTool) for tool in used_tools), (
"AnotherTestTool should be present"
)
assert not any(isinstance(tool, TestTool) for tool in used_tools), (
"TestTool should not be present among used tools"
)
# Confirm delegation tool(s) are present
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
# Finally, make sure the agent's original tools remain unchanged
assert len(researcher_with_delegation.tools) == 1
@@ -1595,9 +1560,9 @@ def test_code_execution_flag_adds_code_tool_upon_kickoff():
# Verify that exactly one tool was used and it was a CodeInterpreterTool
assert len(used_tools) == 1, "Should have exactly one tool"
assert isinstance(
used_tools[0], CodeInterpreterTool
), "Tool should be CodeInterpreterTool"
assert isinstance(used_tools[0], CodeInterpreterTool), (
"Tool should be CodeInterpreterTool"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -1954,7 +1919,6 @@ def test_task_callback_on_crew():
def test_task_callback_both_on_task_and_crew():
from unittest.mock import MagicMock, patch
mock_callback_on_task = MagicMock()
mock_callback_on_crew = MagicMock()
@@ -2096,210 +2060,6 @@ def test_tools_with_custom_caching():
assert result.raw == "3"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_task_uses_last_output():
"""Test that conditional tasks use the last task output for condition evaluation."""
task1 = Task(
description="First task",
expected_output="First output",
agent=researcher,
)
def condition_fails(task_output: TaskOutput) -> bool:
# This condition will never be met
return "never matches" in task_output.raw.lower()
def condition_succeeds(task_output: TaskOutput) -> bool:
# This condition will match first task's output
return "first success" in task_output.raw.lower()
conditional_task1 = ConditionalTask(
description="Second task - conditional that fails condition",
expected_output="Second output",
agent=researcher,
condition=condition_fails,
)
conditional_task2 = ConditionalTask(
description="Third task - conditional that succeeds using first task output",
expected_output="Third output",
agent=writer,
condition=condition_succeeds,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, conditional_task1, conditional_task2],
)
# Mock outputs for tasks
mock_first = TaskOutput(
description="First task output",
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_third]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify outputs collection:
# First executed task output, followed by an automatically generated (skipped) output, then the conditional execution
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "First success output"
) # First task succeeded
assert (
result.tasks_output[1].raw == ""
) # Second task skipped (condition failed)
assert (
result.tasks_output[2].raw == "Third task executed"
) # Third task used first task's output
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_tasks_result_collection():
"""Test that task outputs are properly collected based on execution status."""
task1 = Task(
description="Normal task that always executes",
expected_output="First output",
agent=researcher,
)
def condition_never_met(task_output: TaskOutput) -> bool:
return "never matches" in task_output.raw.lower()
def condition_always_met(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
task2 = ConditionalTask(
description="Conditional task that never executes",
expected_output="Second output",
agent=researcher,
condition=condition_never_met,
)
task3 = ConditionalTask(
description="Conditional task that always executes",
expected_output="Third output",
agent=writer,
condition=condition_always_met,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock outputs for different execution paths
mock_success = TaskOutput(
description="Success output",
raw="Success output", # Triggers third task's condition
agent=researcher.role,
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_success, mock_conditional]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify task output collection:
# There should be three outputs: normal task, skipped conditional task (empty output),
# and the conditional task that executed.
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
# Verify task output collection
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():
"""Test that having multiple conditional tasks in sequence works correctly."""
task1 = Task(
description="Initial research task",
expected_output="Research output",
agent=researcher,
)
def condition1(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
def condition2(task_output: TaskOutput) -> bool:
return "proceed" in task_output.raw.lower()
task2 = ConditionalTask(
description="First conditional task",
expected_output="Conditional output 1",
agent=writer,
condition=condition1,
)
task3 = ConditionalTask(
description="Second conditional task",
expected_output="Conditional output 2",
agent=writer,
condition=condition2,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock different task outputs to test conditional logic
mock_success = TaskOutput(
description="Mock success",
raw="Success and proceed output",
agent=researcher.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
# Verify all tasks were executed (no IndexError)
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
@@ -3418,9 +3178,9 @@ def test_fetch_inputs():
expected_placeholders = {"role_detail", "topic", "field"}
actual_placeholders = crew.fetch_inputs()
assert (
actual_placeholders == expected_placeholders
), f"Expected {expected_placeholders}, but got {actual_placeholders}"
assert actual_placeholders == expected_placeholders, (
f"Expected {expected_placeholders}, but got {actual_placeholders}"
)
def test_task_tools_preserve_code_execution_tools():
@@ -3493,20 +3253,20 @@ def test_task_tools_preserve_code_execution_tools():
used_tools = kwargs["tools"]
# Verify all expected tools are present
assert any(
isinstance(tool, TestTool) for tool in used_tools
), "Task's TestTool should be present"
assert any(
isinstance(tool, CodeInterpreterTool) for tool in used_tools
), "CodeInterpreterTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
assert any(isinstance(tool, TestTool) for tool in used_tools), (
"Task's TestTool should be present"
)
assert any(isinstance(tool, CodeInterpreterTool) for tool in used_tools), (
"CodeInterpreterTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
# Verify the total number of tools (TestTool + CodeInterpreter + 2 delegation tools)
assert (
len(used_tools) == 4
), "Should have TestTool, CodeInterpreter, and 2 delegation tools"
assert len(used_tools) == 4, (
"Should have TestTool, CodeInterpreter, and 2 delegation tools"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -3550,9 +3310,9 @@ def test_multimodal_flag_adds_multimodal_tools():
used_tools = kwargs["tools"]
# Check that the multimodal tool was added
assert any(
isinstance(tool, AddImageTool) for tool in used_tools
), "AddImageTool should be present when agent is multimodal"
assert any(isinstance(tool, AddImageTool) for tool in used_tools), (
"AddImageTool should be present when agent is multimodal"
)
# Verify we have exactly one tool (just the AddImageTool)
assert len(used_tools) == 1, "Should only have the AddImageTool"
@@ -3778,9 +3538,9 @@ def test_crew_guardrail_feedback_in_context():
assert len(execution_contexts) > 1, "Task should have been executed multiple times"
# Verify that the second execution included the guardrail feedback
assert (
"Output must contain the keyword 'IMPORTANT'" in execution_contexts[1]
), "Guardrail feedback should be included in retry context"
assert "Output must contain the keyword 'IMPORTANT'" in execution_contexts[1], (
"Guardrail feedback should be included in retry context"
)
# Verify final output meets guardrail requirements
assert "IMPORTANT" in result.raw, "Final output should contain required keyword"

View File

@@ -286,79 +286,6 @@ def test_o3_mini_reasoning_effort_medium():
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.fixture
def anthropic_llm():
"""Fixture providing an Anthropic LLM instance."""
return LLM(model="anthropic/claude-3-sonnet")
@pytest.fixture
def system_message():
"""Fixture providing a system message."""
return {"role": "system", "content": "test"}
@pytest.fixture
def user_message():
"""Fixture providing a user message."""
return {"role": "user", "content": "test"}
def test_anthropic_message_formatting_edge_cases(anthropic_llm):
"""Test edge cases for Anthropic message formatting."""
# Test None messages
with pytest.raises(TypeError, match="Messages cannot be None"):
anthropic_llm._format_messages_for_provider(None)
# Test empty message list
formatted = anthropic_llm._format_messages_for_provider([])
assert len(formatted) == 1
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
# Test invalid message format
with pytest.raises(TypeError, match="Invalid message format"):
anthropic_llm._format_messages_for_provider([{"invalid": "message"}])
def test_anthropic_model_detection():
"""Test Anthropic model detection with various formats."""
models = [
("anthropic/claude-3", True),
("claude-instant", True),
("claude/v1", True),
("gpt-4", False),
("", False),
("anthropomorphic", False), # Should not match partial words
]
for model, expected in models:
llm = LLM(model=model)
assert llm.is_anthropic == expected, f"Failed for model: {model}"
def test_anthropic_message_formatting(anthropic_llm, system_message, user_message):
"""Test Anthropic message formatting with fixtures."""
# Test when first message is system
formatted = anthropic_llm._format_messages_for_provider([system_message])
assert len(formatted) == 2
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
assert formatted[1] == system_message
# Test when first message is already user
formatted = anthropic_llm._format_messages_for_provider([user_message])
assert len(formatted) == 1
assert formatted[0] == user_message
# Test with empty message list
formatted = anthropic_llm._format_messages_for_provider([])
assert len(formatted) == 1
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
# Test with non-Anthropic model (should not modify messages)
non_anthropic_llm = LLM(model="gpt-4")
formatted = non_anthropic_llm._format_messages_for_provider([system_message])
assert len(formatted) == 1
assert formatted[0] == system_message
def test_deepseek_r1_with_open_router():
if not os.getenv("OPEN_ROUTER_API_KEY"):
pytest.skip("OPEN_ROUTER_API_KEY not set; skipping test.")