Compare commits

..

6 Commits

Author SHA1 Message Date
Devin AI
2c12b91083 fix: use isinstance for proper type narrowing in Knowledge.query
Co-Authored-By: Joe Moura <joao@crewai.com>
2024-12-27 20:55:45 +00:00
Devin AI
2d23ef7e12 fix: improve type narrowing in Knowledge.query with local variable
Co-Authored-By: Joe Moura <joao@crewai.com>
2024-12-27 20:54:14 +00:00
Devin AI
b56c4c52c0 fix: handle None storage in Knowledge.query for type checks
Co-Authored-By: Joe Moura <joao@crewai.com>
2024-12-27 20:51:50 +00:00
João Moura
81759e8c72 Merge branch 'main' into fix-knowledgestorage-default-instantiation 2024-12-27 17:18:33 -03:00
ericklima-ca
27472ba69e refactor: Change storage field to optional and improve error handling when saving documents 2024-12-26 22:27:19 -04:00
ericklima-ca
25aa774d8c fix: Change storage initialization to None for KnowledgeStorage 2024-12-26 21:30:06 -04:00
13 changed files with 89 additions and 898 deletions

View File

@@ -1,211 +0,0 @@
# Portkey Integration with CrewAI
<img src="https://raw.githubusercontent.com/siddharthsambharia-portkey/Portkey-Product-Images/main/Portkey-CrewAI.png" alt="Portkey CrewAI Header Image" width="70%" />
[Portkey](https://portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai) is a 2-line upgrade to make your CrewAI agents reliable, cost-efficient, and fast.
Portkey adds 4 core production capabilities to any CrewAI agent:
1. Routing to **200+ LLMs**
2. Making each LLM call more robust
3. Full-stack tracing & cost, performance analytics
4. Real-time guardrails to enforce behavior
## Getting Started
1. **Install Required Packages:**
```bash
pip install -qU crewai portkey-ai
```
2. **Configure the LLM Client:**
To build CrewAI Agents with Portkey, you'll need two keys:
- **Portkey API Key**: Sign up on the [Portkey app](https://app.portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai) and copy your API key
- **Virtual Key**: Virtual Keys securely manage your LLM API keys in one place. Store your LLM provider API keys securely in Portkey's vault
```python
from crewai import LLM
from portkey_ai import createHeaders, PORTKEY_GATEWAY_URL
gpt_llm = LLM(
model="gpt-4",
base_url=PORTKEY_GATEWAY_URL,
api_key="dummy", # We are using Virtual key
extra_headers=createHeaders(
api_key="YOUR_PORTKEY_API_KEY",
virtual_key="YOUR_VIRTUAL_KEY", # Enter your Virtual key from Portkey
)
)
```
3. **Create and Run Your First Agent:**
```python
from crewai import Agent, Task, Crew
# Define your agents with roles and goals
coder = Agent(
role='Software developer',
goal='Write clear, concise code on demand',
backstory='An expert coder with a keen eye for software trends.',
llm=gpt_llm
)
# Create tasks for your agents
task1 = Task(
description="Define the HTML for making a simple website with heading- Hello World! Portkey is working!",
expected_output="A clear and concise HTML code",
agent=coder
)
# Instantiate your crew
crew = Crew(
agents=[coder],
tasks=[task1],
)
result = crew.kickoff()
print(result)
```
## Key Features
| Feature | Description |
|---------|-------------|
| 🌐 Multi-LLM Support | Access OpenAI, Anthropic, Gemini, Azure, and 250+ providers through a unified interface |
| 🛡️ Production Reliability | Implement retries, timeouts, load balancing, and fallbacks |
| 📊 Advanced Observability | Track 40+ metrics including costs, tokens, latency, and custom metadata |
| 🔍 Comprehensive Logging | Debug with detailed execution traces and function call logs |
| 🚧 Security Controls | Set budget limits and implement role-based access control |
| 🔄 Performance Analytics | Capture and analyze feedback for continuous improvement |
| 💾 Intelligent Caching | Reduce costs and latency with semantic or simple caching |
## Production Features with Portkey Configs
All features mentioned below are through Portkey's Config system. Portkey's Config system allows you to define routing strategies using simple JSON objects in your LLM API calls. You can create and manage Configs directly in your code or through the Portkey Dashboard. Each Config has a unique ID for easy reference.
<Frame>
<img src="https://raw.githubusercontent.com/Portkey-AI/docs-core/refs/heads/main/images/libraries/libraries-3.avif"/>
</Frame>
### 1. Use 250+ LLMs
Access various LLMs like Anthropic, Gemini, Mistral, Azure OpenAI, and more with minimal code changes. Switch between providers or use them together seamlessly. [Learn more about Universal API](https://portkey.ai/docs/product/ai-gateway/universal-api)
Easily switch between different LLM providers:
```python
# Anthropic Configuration
anthropic_llm = LLM(
model="claude-3-5-sonnet-latest",
base_url=PORTKEY_GATEWAY_URL,
api_key="dummy",
extra_headers=createHeaders(
api_key="YOUR_PORTKEY_API_KEY",
virtual_key="YOUR_ANTHROPIC_VIRTUAL_KEY", #You don't need provider when using Virtual keys
trace_id="anthropic_agent"
)
)
# Azure OpenAI Configuration
azure_llm = LLM(
model="gpt-4",
base_url=PORTKEY_GATEWAY_URL,
api_key="dummy",
extra_headers=createHeaders(
api_key="YOUR_PORTKEY_API_KEY",
virtual_key="YOUR_AZURE_VIRTUAL_KEY", #You don't need provider when using Virtual keys
trace_id="azure_agent"
)
)
```
### 2. Caching
Improve response times and reduce costs with two powerful caching modes:
- **Simple Cache**: Perfect for exact matches
- **Semantic Cache**: Matches responses for requests that are semantically similar
[Learn more about Caching](https://portkey.ai/docs/product/ai-gateway/cache-simple-and-semantic)
```py
config = {
"cache": {
"mode": "semantic", # or "simple" for exact matching
}
}
```
### 3. Production Reliability
Portkey provides comprehensive reliability features:
- **Automatic Retries**: Handle temporary failures gracefully
- **Request Timeouts**: Prevent hanging operations
- **Conditional Routing**: Route requests based on specific conditions
- **Fallbacks**: Set up automatic provider failovers
- **Load Balancing**: Distribute requests efficiently
[Learn more about Reliability Features](https://portkey.ai/docs/product/ai-gateway/)
### 4. Metrics
Agent runs are complex. Portkey automatically logs **40+ comprehensive metrics** for your AI agents, including cost, tokens used, latency, etc. Whether you need a broad overview or granular insights into your agent runs, Portkey's customizable filters provide the metrics you need.
- Cost per agent interaction
- Response times and latency
- Token usage and efficiency
- Success/failure rates
- Cache hit rates
<img src="https://github.com/siddharthsambharia-portkey/Portkey-Product-Images/blob/main/Portkey-Dashboard.png?raw=true" width="70%" alt="Portkey Dashboard" />
### 5. Detailed Logging
Logs are essential for understanding agent behavior, diagnosing issues, and improving performance. They provide a detailed record of agent activities and tool use, which is crucial for debugging and optimizing processes.
Access a dedicated section to view records of agent executions, including parameters, outcomes, function calls, and errors. Filter logs based on multiple parameters such as trace ID, model, tokens used, and metadata.
<details>
<summary><b>Traces</b></summary>
<img src="https://raw.githubusercontent.com/siddharthsambharia-portkey/Portkey-Product-Images/main/Portkey-Traces.png" alt="Portkey Traces" width="70%" />
</details>
<details>
<summary><b>Logs</b></summary>
<img src="https://raw.githubusercontent.com/siddharthsambharia-portkey/Portkey-Product-Images/main/Portkey-Logs.png" alt="Portkey Logs" width="70%" />
</details>
### 6. Enterprise Security Features
- Set budget limit and rate limts per Virtual Key (disposable API keys)
- Implement role-based access control
- Track system changes with audit logs
- Configure data retention policies
For detailed information on creating and managing Configs, visit the [Portkey documentation](https://docs.portkey.ai/product/ai-gateway/configs).
## Resources
- [📘 Portkey Documentation](https://docs.portkey.ai)
- [📊 Portkey Dashboard](https://app.portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai)
- [🐦 Twitter](https://twitter.com/portkeyai)
- [💬 Discord Community](https://discord.gg/DD7vgKK299)

View File

@@ -1,138 +0,0 @@
---
title: Using Multimodal Agents
description: Learn how to enable and use multimodal capabilities in your agents for processing images and other non-text content within the CrewAI framework.
icon: image
---
# Using Multimodal Agents
CrewAI supports multimodal agents that can process both text and non-text content like images. This guide will show you how to enable and use multimodal capabilities in your agents.
## Enabling Multimodal Capabilities
To create a multimodal agent, simply set the `multimodal` parameter to `True` when initializing your agent:
```python
from crewai import Agent
agent = Agent(
role="Image Analyst",
goal="Analyze and extract insights from images",
backstory="An expert in visual content interpretation with years of experience in image analysis",
multimodal=True # This enables multimodal capabilities
)
```
When you set `multimodal=True`, the agent is automatically configured with the necessary tools for handling non-text content, including the `AddImageTool`.
## Working with Images
The multimodal agent comes pre-configured with the `AddImageTool`, which allows it to process images. You don't need to manually add this tool - it's automatically included when you enable multimodal capabilities.
Here's a complete example showing how to use a multimodal agent to analyze an image:
```python
from crewai import Agent, Task, Crew
# Create a multimodal agent
image_analyst = Agent(
role="Product Analyst",
goal="Analyze product images and provide detailed descriptions",
backstory="Expert in visual product analysis with deep knowledge of design and features",
multimodal=True
)
# Create a task for image analysis
task = Task(
description="Analyze the product image at https://example.com/product.jpg and provide a detailed description",
agent=image_analyst
)
# Create and run the crew
crew = Crew(
agents=[image_analyst],
tasks=[task]
)
result = crew.kickoff()
```
### Advanced Usage with Context
You can provide additional context or specific questions about the image when creating tasks for multimodal agents. The task description can include specific aspects you want the agent to focus on:
```python
from crewai import Agent, Task, Crew
# Create a multimodal agent for detailed analysis
expert_analyst = Agent(
role="Visual Quality Inspector",
goal="Perform detailed quality analysis of product images",
backstory="Senior quality control expert with expertise in visual inspection",
multimodal=True # AddImageTool is automatically included
)
# Create a task with specific analysis requirements
inspection_task = Task(
description="""
Analyze the product image at https://example.com/product.jpg with focus on:
1. Quality of materials
2. Manufacturing defects
3. Compliance with standards
Provide a detailed report highlighting any issues found.
""",
agent=expert_analyst
)
# Create and run the crew
crew = Crew(
agents=[expert_analyst],
tasks=[inspection_task]
)
result = crew.kickoff()
```
### Tool Details
When working with multimodal agents, the `AddImageTool` is automatically configured with the following schema:
```python
class AddImageToolSchema:
image_url: str # Required: The URL or path of the image to process
action: Optional[str] = None # Optional: Additional context or specific questions about the image
```
The multimodal agent will automatically handle the image processing through its built-in tools, allowing it to:
- Access images via URLs or local file paths
- Process image content with optional context or specific questions
- Provide analysis and insights based on the visual information and task requirements
## Best Practices
When working with multimodal agents, keep these best practices in mind:
1. **Image Access**
- Ensure your images are accessible via URLs that the agent can reach
- For local images, consider hosting them temporarily or using absolute file paths
- Verify that image URLs are valid and accessible before running tasks
2. **Task Description**
- Be specific about what aspects of the image you want the agent to analyze
- Include clear questions or requirements in the task description
- Consider using the optional `action` parameter for focused analysis
3. **Resource Management**
- Image processing may require more computational resources than text-only tasks
- Some language models may require base64 encoding for image data
- Consider batch processing for multiple images to optimize performance
4. **Environment Setup**
- Verify that your environment has the necessary dependencies for image processing
- Ensure your language model supports multimodal capabilities
- Test with small images first to validate your setup
5. **Error Handling**
- Implement proper error handling for image loading failures
- Have fallback strategies for when image processing fails
- Monitor and log image processing operations for debugging

View File

@@ -67,6 +67,7 @@ dev-dependencies = [
"mkdocs-material-extensions>=1.3.1",
"pillow>=10.2.0",
"cairosvg>=2.7.1",
"crewai-tools>=0.17.0",
"pytest>=8.0.0",
"pytest-vcr>=1.0.2",
"python-dotenv>=1.0.0",

View File

@@ -113,10 +113,6 @@ class Crew(BaseModel):
default=False,
description="Whether the crew should use memory to store memories of it's execution",
)
memory_verbose: bool = Field(
default=False,
description="Whether to show verbose logs about memory operations",
)
memory_config: Optional[Dict[str, Any]] = Field(
default=None,
description="Configuration for the memory to be used for the crew.",
@@ -261,7 +257,7 @@ class Crew(BaseModel):
"""Set private attributes."""
if self.memory:
self._long_term_memory = (
self.long_term_memory if self.long_term_memory else LongTermMemory(memory_verbose=self.memory_verbose)
self.long_term_memory if self.long_term_memory else LongTermMemory()
)
self._short_term_memory = (
self.short_term_memory
@@ -269,17 +265,16 @@ class Crew(BaseModel):
else ShortTermMemory(
crew=self,
embedder_config=self.embedder,
memory_verbose=self.memory_verbose,
)
)
self._entity_memory = (
self.entity_memory
if self.entity_memory
else EntityMemory(crew=self, embedder_config=self.embedder, memory_verbose=self.memory_verbose)
else EntityMemory(crew=self, embedder_config=self.embedder)
)
if hasattr(self, "memory_config") and self.memory_config is not None:
self._user_memory = (
self.user_memory if self.user_memory else UserMemory(crew=self, memory_verbose=self.memory_verbose)
self.user_memory if self.user_memory else UserMemory(crew=self)
)
else:
self._user_memory = None

View File

@@ -14,13 +14,13 @@ class Knowledge(BaseModel):
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
Args:
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder_config: Optional[Dict[str, Any]] = None
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder_config: Optional[Dict[str, Any]] = None
collection_name: Optional[str] = None
@@ -49,9 +49,15 @@ class Knowledge(BaseModel):
"""
Query across all knowledge sources to find the most relevant information.
Returns the top_k most relevant chunks.
"""
results = self.storage.search(
Raises:
ValueError: If no storage is configured for querying.
"""
storage = self.storage
if not isinstance(storage, KnowledgeStorage):
raise ValueError("No storage found to perform query.")
# Using isinstance check for proper type narrowing
results = storage.search(
query,
limit,
)

View File

@@ -22,7 +22,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
default_factory=list, description="The path to the file"
)
content: Dict[Path, str] = Field(init=False, default_factory=dict)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
@@ -62,7 +62,10 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
def _save_documents(self):
"""Save the documents to the storage."""
self.storage.save(self.chunks)
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""

View File

@@ -16,7 +16,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
chunk_embeddings: List[np.ndarray] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
metadata: Dict[str, Any] = Field(default_factory=dict) # Currently unused
collection_name: Optional[str] = Field(default=None)
@@ -46,4 +46,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
Save the documents to the storage.
This method should be called after the chunks and embeddings are generated.
"""
self.storage.save(self.chunks)
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")

View File

@@ -1,7 +1,5 @@
from typing import Any, Dict, List, Optional
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.memory import Memory, MemoryOperationError
from crewai.memory.memory import Memory
from crewai.memory.storage.rag_storage import RAGStorage
@@ -10,24 +8,9 @@ class EntityMemory(Memory):
EntityMemory class for managing structured information about entities
and their relationships using SQLite storage.
Inherits from the Memory class.
Attributes:
memory_provider: The memory provider to use, if any.
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, crew=None, embedder_config=None, storage=None, path=None, memory_verbose=False):
"""
Initialize an EntityMemory instance.
Args:
crew: The crew to associate with this memory.
embedder_config: Configuration for the embedder.
storage: The storage backend for the memory.
path: Path to the storage file, if any.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if hasattr(crew, "memory_config") and crew.memory_config is not None:
self.memory_provider = crew.memory_config.get("provider")
else:
@@ -53,48 +36,23 @@ class EntityMemory(Memory):
path=path,
)
)
super().__init__(storage, memory_verbose=memory_verbose)
super().__init__(storage)
def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
"""
Saves an entity item into storage.
Args:
item: The entity memory item to save.
Raises:
MemoryOperationError: If there's an error saving the entity to memory.
"""
try:
if self.memory_verbose:
self._log_operation("Saving entity", f"{item.name} ({item.type})")
self._log_operation("Description", item.description)
if self.memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error saving entity", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "save entity", self.__class__.__name__)
"""Saves an entity item into the SQLite storage."""
if self.memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
def reset(self) -> None:
"""
Reset the entity memory.
Raises:
MemoryOperationError: If there's an error resetting the memory.
"""
try:
self.storage.reset()
except Exception as e:
if self.memory_verbose:
self._log_operation("Error resetting", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "reset", self.__class__.__name__)
raise Exception(f"An error occurred while resetting the entity memory: {e}")

View File

@@ -1,7 +1,7 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.memory import Memory, MemoryOperationError
from crewai.memory.memory import Memory
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
@@ -12,90 +12,25 @@ class LongTermMemory(Memory):
Inherits from the Memory class and utilizes an instance of a class that
adheres to the Storage for data storage, specifically working with
LongTermMemoryItem instances.
Attributes:
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, storage=None, path=None, memory_verbose=False):
"""
Initialize a LongTermMemory instance.
Args:
storage: The storage backend for the memory.
path: Path to the storage file, if any.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, storage=None, path=None):
if not storage:
storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
super().__init__(storage, memory_verbose=memory_verbose)
super().__init__(storage)
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
"""
Save a long-term memory item to storage.
Args:
item: The long-term memory item to save.
Raises:
MemoryOperationError: If there's an error saving the item to memory.
"""
try:
if self.memory_verbose:
self._log_operation("Saving task", item.task)
self._log_operation("Agent", item.agent)
self._log_operation("Quality", str(item.metadata.get('quality')))
metadata = item.metadata
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
task_description=item.task,
score=metadata["quality"],
metadata=metadata,
datetime=item.datetime,
)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error saving task", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "save task", self.__class__.__name__)
metadata = item.metadata
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
task_description=item.task,
score=metadata["quality"],
metadata=metadata,
datetime=item.datetime,
)
def search(self, task: str, latest_n: int = 3) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
"""
Search for long-term memories related to a task.
Args:
task: The task description to search for.
latest_n: Maximum number of results to return.
Returns:
A list of matching long-term memories.
Raises:
MemoryOperationError: If there's an error searching memory.
"""
try:
if self.memory_verbose:
self._log_operation("Searching for task", task)
results = self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
if self.memory_verbose and results:
self._log_operation("Found", f"{len(results)} results")
return results
except Exception as e:
if self.memory_verbose:
self._log_operation("Error searching", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "search", self.__class__.__name__)
return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
def reset(self) -> None:
"""
Reset the long-term memory.
Raises:
MemoryOperationError: If there's an error resetting the memory.
"""
try:
self.storage.reset()
except Exception as e:
if self.memory_verbose:
self._log_operation("Error resetting", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "reset", self.__class__.__name__)
self.storage.reset()

View File

@@ -1,67 +1,15 @@
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.utilities.logger import Logger
class MemoryOperationError(Exception):
"""
Exception raised for errors in memory operations.
Attributes:
message: Explanation of the error
operation: The operation that failed (e.g., "save", "search")
memory_type: The type of memory where the error occurred
"""
def __init__(self, message: str, operation: str, memory_type: str):
self.operation = operation
self.memory_type = memory_type
super().__init__(f"{memory_type} {operation} error: {message}")
class Memory:
"""
Base class for memory, now supporting agent tags and generic metadata.
Attributes:
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, storage: RAGStorage, memory_verbose: bool = False):
"""
Initialize a Memory instance.
Args:
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, storage: RAGStorage):
self.storage = storage
self.memory_verbose = memory_verbose
self._logger = Logger(verbose=memory_verbose)
def _log_operation(self, operation: str, details: str, agent: Optional[str] = None, level: str = "info", color: str = "cyan") -> None:
"""
Log a memory operation if memory_verbose is enabled.
Args:
operation: The type of operation (e.g., "Saving", "Searching").
details: Details about the operation.
agent: The agent performing the operation, if any.
level: The log level.
color: The color to use for the log message.
"""
if not self.memory_verbose:
return
sanitized_details = str(details)
if len(sanitized_details) > 100:
sanitized_details = f"{sanitized_details[:100]}..."
memory_type = self.__class__.__name__
agent_info = f" from agent '{agent}'" if agent else ""
self._logger.log(level, f"{memory_type}: {operation}{agent_info}: {sanitized_details}", color=color)
def save(
self,
@@ -69,30 +17,11 @@ class Memory:
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""
Save a value to memory.
Args:
value: The value to save.
metadata: Additional metadata to store with the value.
agent: The agent saving the value, if any.
Raises:
MemoryOperationError: If there's an error saving the value to memory.
"""
metadata = metadata or {}
if agent:
metadata["agent"] = agent
if self.memory_verbose:
self._log_operation("Saving", str(value), agent)
try:
self.storage.save(value, metadata)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error saving", str(e), agent, level="error", color="red")
raise MemoryOperationError(str(e), "save", self.__class__.__name__)
self.storage.save(value, metadata)
def search(
self,
@@ -100,33 +29,6 @@ class Memory:
limit: int = 3,
score_threshold: float = 0.35,
) -> List[Any]:
"""
Search for values in memory.
Args:
query: The search query.
limit: Maximum number of results to return.
score_threshold: Minimum similarity score for results.
Returns:
A list of matching values.
Raises:
MemoryOperationError: If there's an error searching memory.
"""
if self.memory_verbose:
self._log_operation("Searching for", query)
try:
results = self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
)
if self.memory_verbose and results:
self._log_operation("Found", f"{len(results)} results")
return results
except Exception as e:
if self.memory_verbose:
self._log_operation("Error searching", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "search", self.__class__.__name__)
return self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
)

View File

@@ -1,6 +1,6 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from crewai.memory.memory import Memory, MemoryOperationError
from crewai.memory.memory import Memory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.memory.storage.rag_storage import RAGStorage
@@ -12,24 +12,9 @@ class ShortTermMemory(Memory):
Inherits from the Memory class and utilizes an instance of a class that
adheres to the Storage for data storage, specifically working with
MemoryItem instances.
Attributes:
memory_provider: The memory provider to use, if any.
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, crew=None, embedder_config=None, storage=None, path=None, memory_verbose=False):
"""
Initialize a ShortTermMemory instance.
Args:
crew: The crew to associate with this memory.
embedder_config: Configuration for the embedder.
storage: The storage backend for the memory.
path: Path to the storage file, if any.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if hasattr(crew, "memory_config") and crew.memory_config is not None:
self.memory_provider = crew.memory_config.get("provider")
else:
@@ -54,7 +39,7 @@ class ShortTermMemory(Memory):
path=path,
)
)
super().__init__(storage, memory_verbose=memory_verbose)
super().__init__(storage)
def save(
self,
@@ -62,68 +47,26 @@ class ShortTermMemory(Memory):
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""
Save a value to short-term memory.
Args:
value: The value to save.
metadata: Additional metadata to store with the value.
agent: The agent saving the value, if any.
Raises:
MemoryOperationError: If there's an error saving to memory.
"""
try:
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if self.memory_verbose:
self._log_operation("Saving item", str(item.data), agent)
if self.memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if self.memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error saving item", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "save", self.__class__.__name__)
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
) -> List[Any]:
"""
Search for values in short-term memory.
Args:
query: The search query.
limit: Maximum number of results to return.
score_threshold: Minimum similarity score for results.
Returns:
A list of matching values.
Raises:
MemoryOperationError: If there's an error searching memory.
"""
try:
return super().search(query=query, limit=limit, score_threshold=score_threshold)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error searching", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "search", self.__class__.__name__)
):
return self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
) # type: ignore # BUG? The reference is to the parent class, but the parent class does not have this parameters
def reset(self) -> None:
"""
Reset the short-term memory.
Raises:
MemoryOperationError: If there's an error resetting the memory.
"""
try:
self.storage.reset()
except Exception as e:
if self.memory_verbose:
self._log_operation("Error resetting", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "reset", self.__class__.__name__)
raise Exception(
f"An error occurred while resetting the short-term memory: {e}"
)

View File

@@ -1,6 +1,6 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from crewai.memory.memory import Memory, MemoryOperationError
from crewai.memory.memory import Memory
class UserMemory(Memory):
@@ -9,23 +9,9 @@ class UserMemory(Memory):
Inherits from the Memory class and utilizes an instance of a class that
adheres to the Storage for data storage, specifically working with
MemoryItem instances.
Attributes:
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, crew=None, memory_verbose=False):
"""
Initialize a UserMemory instance.
Args:
crew: The crew to associate with this memory.
memory_verbose: Whether to log memory operations.
Raises:
ImportError: If Mem0 is not installed.
"""
def __init__(self, crew=None):
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
@@ -33,72 +19,27 @@ class UserMemory(Memory):
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
storage = Mem0Storage(type="user", crew=crew)
super().__init__(storage, memory_verbose=memory_verbose)
super().__init__(storage)
def save(
self,
value: Any,
value,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""
Save user memory.
Args:
value: The value to save.
metadata: Additional metadata to store with the value.
agent: The agent saving the value, if any.
Raises:
MemoryOperationError: If there's an error saving to memory.
"""
try:
if self.memory_verbose:
self._log_operation("Saving user memory", str(value))
# TODO: Change this function since we want to take care of the case where we save memories for the usr
data = f"Remember the details about the user: {value}"
super().save(data, metadata)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error saving user memory", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "save", self.__class__.__name__)
# TODO: Change this function since we want to take care of the case where we save memories for the usr
data = f"Remember the details about the user: {value}"
super().save(data, metadata)
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
) -> List[Any]:
"""
Search for user memories.
Args:
query: The search query.
limit: Maximum number of results to return.
score_threshold: Minimum similarity score for results.
Returns:
A list of matching user memories.
Raises:
MemoryOperationError: If there's an error searching memory.
"""
try:
if self.memory_verbose:
self._log_operation("Searching user memory", query)
results = self.storage.search(
query=query,
limit=limit,
score_threshold=score_threshold,
)
if self.memory_verbose and results:
self._log_operation("Found", f"{len(results)} results")
return results
except Exception as e:
if self.memory_verbose:
self._log_operation("Error searching user memory", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "search", self.__class__.__name__)
):
results = self.storage.search(
query=query,
limit=limit,
score_threshold=score_threshold,
)
return results

View File

@@ -1,147 +0,0 @@
from unittest.mock import patch, MagicMock
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.memory import Memory, MemoryOperationError
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.task import Task
from crewai.utilities.logger import Logger
def test_memory_verbose_flag_in_crew():
"""Test that memory_verbose flag is correctly set in Crew"""
agent = Agent(
role="Researcher",
goal="Research goal",
backstory="Researcher backstory",
)
task = Task(
description="Test task",
expected_output="Test output",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], memory=True, memory_verbose=True)
assert crew.memory_verbose is True
def test_memory_verbose_logging_in_memory():
"""Test that memory operations are logged when memory_verbose is enabled"""
storage = MagicMock()
mock_logger = MagicMock(spec=Logger)
memory = Memory(storage=storage, memory_verbose=True)
memory._logger = mock_logger
memory.save("test value", {"test": "metadata"}, "test_agent")
mock_logger.log.assert_called_once()
args = mock_logger.log.call_args[0]
assert args[0] == "info"
assert "Saving" in args[1]
mock_logger.log.reset_mock()
memory.search("test query")
assert mock_logger.log.call_count == 2
first_call_args = mock_logger.log.call_args_list[0][0]
assert first_call_args[0] == "info"
assert "Searching" in first_call_args[1]
second_call_args = mock_logger.log.call_args_list[1][0]
assert "Found" in second_call_args[1]
def test_no_logging_when_memory_verbose_disabled():
"""Test that no logging occurs when memory_verbose is disabled"""
storage = MagicMock()
mock_logger = MagicMock(spec=Logger)
memory = Memory(storage=storage, memory_verbose=False)
memory._logger = mock_logger
memory.save("test value", {"test": "metadata"}, "test_agent")
mock_logger.log.assert_not_called()
memory.search("test query")
mock_logger.log.assert_not_called()
def test_memory_verbose_in_short_term_memory():
"""Test that memory_verbose flag is correctly passed to ShortTermMemory"""
with patch('crewai.memory.short_term.short_term_memory.RAGStorage') as mock_storage_class:
mock_storage = MagicMock()
mock_storage_class.return_value = mock_storage
memory = ShortTermMemory(memory_verbose=True)
assert memory.memory_verbose is True
mock_logger = MagicMock()
memory._logger = mock_logger
memory.save("test value", {"test": "metadata"}, "test_agent")
assert mock_logger.log.call_count >= 1
def test_memory_verbose_passed_from_crew_to_memory():
"""Test that memory_verbose flag is correctly passed from Crew to memory instances"""
with patch('crewai.crew.LongTermMemory') as mock_ltm, \
patch('crewai.crew.ShortTermMemory') as mock_stm, \
patch('crewai.crew.EntityMemory') as mock_em, \
patch('crewai.crew.UserMemory') as mock_um:
mock_ltm_instance = MagicMock()
mock_stm_instance = MagicMock()
mock_em_instance = MagicMock()
mock_um_instance = MagicMock()
mock_ltm.return_value = mock_ltm_instance
mock_stm.return_value = mock_stm_instance
mock_em.return_value = mock_em_instance
mock_um.return_value = mock_um_instance
agent = Agent(
role="Researcher",
goal="Research goal",
backstory="Researcher backstory",
)
task = Task(
description="Test task",
expected_output="Test output",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], memory=True, memory_verbose=True, memory_config={})
mock_ltm.assert_called_once_with(memory_verbose=True)
mock_stm.assert_called_with(crew=crew, embedder_config=None, memory_verbose=True)
mock_em.assert_called_with(crew=crew, embedder_config=None, memory_verbose=True)
mock_um.assert_called_with(crew=crew, memory_verbose=True)
def test_memory_verbose_error_handling():
"""Test that memory operations errors are properly handled when memory_verbose is enabled"""
storage = MagicMock()
storage.save.side_effect = Exception("Test error")
storage.search.side_effect = Exception("Test error")
mock_logger = MagicMock()
with patch('crewai.memory.memory.Logger', return_value=mock_logger):
memory = Memory(storage=storage, memory_verbose=True)
with pytest.raises(MemoryOperationError) as exc_info:
memory.save("test value", {"test": "metadata"}, "test_agent")
assert "save" in str(exc_info.value)
assert "Test error" in str(exc_info.value)
assert "Memory" in str(exc_info.value)
with pytest.raises(MemoryOperationError) as exc_info:
memory.search("test query")
assert "search" in str(exc_info.value)
assert "Test error" in str(exc_info.value)