mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-06 14:48:29 +00:00
Compare commits
3 Commits
lg-memory-
...
lg-memory-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df4754301a | ||
|
|
ae57e5723c | ||
|
|
ab39753a75 |
@@ -255,6 +255,17 @@ CrewAI provides a wide range of events that you can listen for:
|
||||
- **LLMCallFailedEvent**: Emitted when an LLM call fails
|
||||
- **LLMStreamChunkEvent**: Emitted for each chunk received during streaming LLM responses
|
||||
|
||||
### Memory Events
|
||||
|
||||
- **MemoryQueryStartedEvent**: Emitted when a memory query is started. Contains the query, limit, and optional score threshold.
|
||||
- **MemoryQueryCompletedEvent**: Emitted when a memory query is completed successfully. Contains the query, results, limit, score threshold, and query execution time.
|
||||
- **MemoryQueryFailedEvent**: Emitted when a memory query fails. Contains the query, limit, score threshold, and error message.
|
||||
- **MemorySaveStartedEvent**: Emitted when a memory save operation is started. Contains the value to be saved, metadata, and optional agent role.
|
||||
- **MemorySaveCompletedEvent**: Emitted when a memory save operation is completed successfully. Contains the saved value, metadata, agent role, and save execution time.
|
||||
- **MemorySaveFailedEvent**: Emitted when a memory save operation fails. Contains the value, metadata, agent role, and error message.
|
||||
- **MemoryRetrievalStartedEvent**: Emitted when memory retrieval for a task prompt starts. Contains the optional task ID.
|
||||
- **MemoryRetrievalCompletedEvent**: Emitted when memory retrieval for a task prompt completes successfully. Contains the task ID, memory content, and retrieval execution time.
|
||||
|
||||
## Event Handler Structure
|
||||
|
||||
Each event handler receives two parameters:
|
||||
|
||||
@@ -9,7 +9,7 @@ icon: database
|
||||
The CrewAI framework provides a sophisticated memory system designed to significantly enhance AI agent capabilities. CrewAI offers **three distinct memory approaches** that serve different use cases:
|
||||
|
||||
1. **Basic Memory System** - Built-in short-term, long-term, and entity memory
|
||||
2. **User Memory** - User-specific memory with Mem0 integration (legacy approach)
|
||||
2. **User Memory** - User-specific memory with Mem0 integration (legacy approach)
|
||||
3. **External Memory** - Standalone external memory providers (new approach)
|
||||
|
||||
## Memory System Components
|
||||
@@ -62,7 +62,7 @@ By default, CrewAI uses the `appdirs` library to determine storage locations fol
|
||||
```
|
||||
~/Library/Application Support/CrewAI/{project_name}/
|
||||
├── knowledge/ # Knowledge base ChromaDB files
|
||||
├── short_term_memory/ # Short-term memory ChromaDB files
|
||||
├── short_term_memory/ # Short-term memory ChromaDB files
|
||||
├── long_term_memory/ # Long-term memory ChromaDB files
|
||||
├── entities/ # Entity memory ChromaDB files
|
||||
└── long_term_memory_storage.db # SQLite database
|
||||
@@ -252,7 +252,7 @@ chroma_path = os.path.join(storage_path, "knowledge")
|
||||
if os.path.exists(chroma_path):
|
||||
client = chromadb.PersistentClient(path=chroma_path)
|
||||
collections = client.list_collections()
|
||||
|
||||
|
||||
print("ChromaDB Collections:")
|
||||
for collection in collections:
|
||||
print(f" - {collection.name}: {collection.count()} documents")
|
||||
@@ -269,7 +269,7 @@ crew = Crew(agents=[...], tasks=[...], memory=True)
|
||||
|
||||
# Reset specific memory types
|
||||
crew.reset_memories(command_type='short') # Short-term memory
|
||||
crew.reset_memories(command_type='long') # Long-term memory
|
||||
crew.reset_memories(command_type='long') # Long-term memory
|
||||
crew.reset_memories(command_type='entity') # Entity memory
|
||||
crew.reset_memories(command_type='knowledge') # Knowledge storage
|
||||
```
|
||||
@@ -596,7 +596,7 @@ providers_to_test = [
|
||||
{
|
||||
"name": "Ollama",
|
||||
"config": {
|
||||
"provider": "ollama",
|
||||
"provider": "ollama",
|
||||
"config": {"model": "mxbai-embed-large"}
|
||||
}
|
||||
}
|
||||
@@ -604,7 +604,7 @@ providers_to_test = [
|
||||
|
||||
for provider in providers_to_test:
|
||||
print(f"\nTesting {provider['name']} embeddings...")
|
||||
|
||||
|
||||
# Create crew with specific embedder
|
||||
crew = Crew(
|
||||
agents=[...],
|
||||
@@ -612,7 +612,7 @@ for provider in providers_to_test:
|
||||
memory=True,
|
||||
embedder=provider['config']
|
||||
)
|
||||
|
||||
|
||||
# Run your test and measure performance
|
||||
result = crew.kickoff()
|
||||
print(f"{provider['name']} completed successfully")
|
||||
@@ -655,17 +655,17 @@ import time
|
||||
|
||||
def test_embedding_performance(embedder_config, test_text="This is a test document"):
|
||||
start_time = time.time()
|
||||
|
||||
|
||||
crew = Crew(
|
||||
agents=[...],
|
||||
tasks=[...],
|
||||
memory=True,
|
||||
embedder=embedder_config
|
||||
)
|
||||
|
||||
|
||||
# Simulate memory operation
|
||||
crew.kickoff()
|
||||
|
||||
|
||||
end_time = time.time()
|
||||
return end_time - start_time
|
||||
|
||||
@@ -676,7 +676,7 @@ openai_time = test_embedding_performance({
|
||||
})
|
||||
|
||||
ollama_time = test_embedding_performance({
|
||||
"provider": "ollama",
|
||||
"provider": "ollama",
|
||||
"config": {"model": "mxbai-embed-large"}
|
||||
})
|
||||
|
||||
@@ -783,7 +783,7 @@ os.environ["MEM0_API_KEY"] = "your-api-key"
|
||||
# Create external memory instance
|
||||
external_memory = ExternalMemory(
|
||||
embedder_config={
|
||||
"provider": "mem0",
|
||||
"provider": "mem0",
|
||||
"config": {"user_id": "U-123"}
|
||||
}
|
||||
)
|
||||
@@ -808,8 +808,8 @@ class CustomStorage(Storage):
|
||||
|
||||
def save(self, value, metadata=None, agent=None):
|
||||
self.memories.append({
|
||||
"value": value,
|
||||
"metadata": metadata,
|
||||
"value": value,
|
||||
"metadata": metadata,
|
||||
"agent": agent
|
||||
})
|
||||
|
||||
@@ -986,7 +986,201 @@ crew = Crew(
|
||||
- 🫡 **Enhanced Personalization:** Memory enables agents to remember user preferences and historical interactions, leading to personalized experiences.
|
||||
- 🧠 **Improved Problem Solving:** Access to a rich memory store aids agents in making more informed decisions, drawing on past learnings and contextual insights.
|
||||
|
||||
## Memory Events
|
||||
|
||||
CrewAI's event system provides powerful insights into memory operations. By leveraging memory events, you can monitor, debug, and optimize your memory system's performance and behavior.
|
||||
|
||||
### Available Memory Events
|
||||
|
||||
CrewAI emits the following memory-related events:
|
||||
|
||||
| Event | Description | Key Properties |
|
||||
| :---- | :---------- | :------------- |
|
||||
| **MemoryQueryStartedEvent** | Emitted when a memory query begins | `query`, `limit`, `score_threshold` |
|
||||
| **MemoryQueryCompletedEvent** | Emitted when a memory query completes successfully | `query`, `results`, `limit`, `score_threshold`, `query_time_ms` |
|
||||
| **MemoryQueryFailedEvent** | Emitted when a memory query fails | `query`, `limit`, `score_threshold`, `error` |
|
||||
| **MemorySaveStartedEvent** | Emitted when a memory save operation begins | `value`, `metadata`, `agent_role` |
|
||||
| **MemorySaveCompletedEvent** | Emitted when a memory save operation completes successfully | `value`, `metadata`, `agent_role`, `save_time_ms` |
|
||||
| **MemorySaveFailedEvent** | Emitted when a memory save operation fails | `value`, `metadata`, `agent_role`, `error` |
|
||||
| **MemoryRetrievalStartedEvent** | Emitted when memory retrieval for a task prompt starts | `task_id` |
|
||||
| **MemoryRetrievalCompletedEvent** | Emitted when memory retrieval completes successfully | `task_id`, `memory_content`, `retrieval_time_ms` |
|
||||
|
||||
### Practical Applications
|
||||
|
||||
#### 1. Memory Performance Monitoring
|
||||
|
||||
Track memory operation timing to optimize your application:
|
||||
|
||||
```python
|
||||
from crewai.utilities.events.base_event_listener import BaseEventListener
|
||||
from crewai.utilities.events import (
|
||||
MemoryQueryCompletedEvent,
|
||||
MemorySaveCompletedEvent
|
||||
)
|
||||
import time
|
||||
|
||||
class MemoryPerformanceMonitor(BaseEventListener):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.query_times = []
|
||||
self.save_times = []
|
||||
|
||||
def setup_listeners(self, crewai_event_bus):
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def on_memory_query_completed(source, event: MemoryQueryCompletedEvent):
|
||||
self.query_times.append(event.query_time_ms)
|
||||
print(f"Memory query completed in {event.query_time_ms:.2f}ms. Query: '{event.query}'")
|
||||
print(f"Average query time: {sum(self.query_times)/len(self.query_times):.2f}ms")
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def on_memory_save_completed(source, event: MemorySaveCompletedEvent):
|
||||
self.save_times.append(event.save_time_ms)
|
||||
print(f"Memory save completed in {event.save_time_ms:.2f}ms")
|
||||
print(f"Average save time: {sum(self.save_times)/len(self.save_times):.2f}ms")
|
||||
|
||||
# Create an instance of your listener
|
||||
memory_monitor = MemoryPerformanceMonitor()
|
||||
```
|
||||
|
||||
#### 2. Memory Content Logging
|
||||
|
||||
Log memory operations for debugging and insights:
|
||||
|
||||
```python
|
||||
from crewai.utilities.events.base_event_listener import BaseEventListener
|
||||
from crewai.utilities.events import (
|
||||
MemorySaveStartedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryRetrievalCompletedEvent
|
||||
)
|
||||
import logging
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger('memory_events')
|
||||
|
||||
class MemoryLogger(BaseEventListener):
|
||||
def setup_listeners(self, crewai_event_bus):
|
||||
@crewai_event_bus.on(MemorySaveStartedEvent)
|
||||
def on_memory_save_started(source, event: MemorySaveStartedEvent):
|
||||
if event.agent_role:
|
||||
logger.info(f"Agent '{event.agent_role}' saving memory: {event.value[:50]}...")
|
||||
else:
|
||||
logger.info(f"Saving memory: {event.value[:50]}...")
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryStartedEvent)
|
||||
def on_memory_query_started(source, event: MemoryQueryStartedEvent):
|
||||
logger.info(f"Memory query started: '{event.query}' (limit: {event.limit})")
|
||||
|
||||
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
|
||||
def on_memory_retrieval_completed(source, event: MemoryRetrievalCompletedEvent):
|
||||
if event.task_id:
|
||||
logger.info(f"Memory retrieved for task {event.task_id} in {event.retrieval_time_ms:.2f}ms")
|
||||
else:
|
||||
logger.info(f"Memory retrieved in {event.retrieval_time_ms:.2f}ms")
|
||||
logger.debug(f"Memory content: {event.memory_content}")
|
||||
|
||||
# Create an instance of your listener
|
||||
memory_logger = MemoryLogger()
|
||||
```
|
||||
|
||||
#### 3. Error Tracking and Notifications
|
||||
|
||||
Capture and respond to memory errors:
|
||||
|
||||
```python
|
||||
from crewai.utilities.events.base_event_listener import BaseEventListener
|
||||
from crewai.utilities.events import (
|
||||
MemorySaveFailedEvent,
|
||||
MemoryQueryFailedEvent
|
||||
)
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger('memory_errors')
|
||||
|
||||
class MemoryErrorTracker(BaseEventListener):
|
||||
def __init__(self, notify_email: Optional[str] = None):
|
||||
super().__init__()
|
||||
self.notify_email = notify_email
|
||||
self.error_count = 0
|
||||
|
||||
def setup_listeners(self, crewai_event_bus):
|
||||
@crewai_event_bus.on(MemorySaveFailedEvent)
|
||||
def on_memory_save_failed(source, event: MemorySaveFailedEvent):
|
||||
self.error_count += 1
|
||||
agent_info = f"Agent '{event.agent_role}'" if event.agent_role else "Unknown agent"
|
||||
error_message = f"Memory save failed: {event.error}. {agent_info}"
|
||||
logger.error(error_message)
|
||||
|
||||
if self.notify_email and self.error_count % 5 == 0:
|
||||
self._send_notification(error_message)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryFailedEvent)
|
||||
def on_memory_query_failed(source, event: MemoryQueryFailedEvent):
|
||||
self.error_count += 1
|
||||
error_message = f"Memory query failed: {event.error}. Query: '{event.query}'"
|
||||
logger.error(error_message)
|
||||
|
||||
if self.notify_email and self.error_count % 5 == 0:
|
||||
self._send_notification(error_message)
|
||||
|
||||
def _send_notification(self, message):
|
||||
# Implement your notification system (email, Slack, etc.)
|
||||
print(f"[NOTIFICATION] Would send to {self.notify_email}: {message}")
|
||||
|
||||
# Create an instance of your listener
|
||||
error_tracker = MemoryErrorTracker(notify_email="admin@example.com")
|
||||
```
|
||||
|
||||
### Integrating with Analytics Platforms
|
||||
|
||||
Memory events can be forwarded to analytics and monitoring platforms to track performance metrics, detect anomalies, and visualize memory usage patterns:
|
||||
|
||||
```python
|
||||
from crewai.utilities.events.base_event_listener import BaseEventListener
|
||||
from crewai.utilities.events import (
|
||||
MemoryQueryCompletedEvent,
|
||||
MemorySaveCompletedEvent
|
||||
)
|
||||
|
||||
class MemoryAnalyticsForwarder(BaseEventListener):
|
||||
def __init__(self, analytics_client):
|
||||
super().__init__()
|
||||
self.client = analytics_client
|
||||
|
||||
def setup_listeners(self, crewai_event_bus):
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def on_memory_query_completed(source, event: MemoryQueryCompletedEvent):
|
||||
# Forward query metrics to analytics platform
|
||||
self.client.track_metric({
|
||||
"event_type": "memory_query",
|
||||
"query": event.query,
|
||||
"duration_ms": event.query_time_ms,
|
||||
"result_count": len(event.results) if hasattr(event.results, "__len__") else 0,
|
||||
"timestamp": event.timestamp
|
||||
})
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def on_memory_save_completed(source, event: MemorySaveCompletedEvent):
|
||||
# Forward save metrics to analytics platform
|
||||
self.client.track_metric({
|
||||
"event_type": "memory_save",
|
||||
"agent_role": event.agent_role,
|
||||
"duration_ms": event.save_time_ms,
|
||||
"timestamp": event.timestamp
|
||||
})
|
||||
```
|
||||
|
||||
### Best Practices for Memory Event Listeners
|
||||
|
||||
1. **Keep handlers lightweight**: Avoid complex processing in event handlers to prevent performance impacts
|
||||
2. **Use appropriate logging levels**: Use INFO for normal operations, DEBUG for details, ERROR for issues
|
||||
3. **Batch metrics when possible**: Accumulate metrics before sending to external systems
|
||||
4. **Handle exceptions gracefully**: Ensure your event handlers don't crash due to unexpected data
|
||||
5. **Consider memory consumption**: Be mindful of storing large amounts of event data
|
||||
|
||||
## Conclusion
|
||||
|
||||
Integrating CrewAI's memory system into your projects is straightforward. By leveraging the provided memory components and configurations,
|
||||
Integrating CrewAI's memory system into your projects is straightforward. By leveraging the provided memory components and configurations,
|
||||
you can quickly empower your agents with the ability to remember, reason, and learn from their interactions, unlocking new levels of intelligence and capability.
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Type, Union
|
||||
|
||||
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
|
||||
@@ -32,6 +33,10 @@ from crewai.utilities.events.agent_events import (
|
||||
AgentExecutionStartedEvent,
|
||||
)
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemoryRetrievalStartedEvent,
|
||||
MemoryRetrievalCompletedEvent,
|
||||
)
|
||||
from crewai.utilities.events.knowledge_events import (
|
||||
KnowledgeQueryCompletedEvent,
|
||||
KnowledgeQueryFailedEvent,
|
||||
@@ -302,6 +307,15 @@ class Agent(BaseAgent):
|
||||
)
|
||||
|
||||
if self._is_any_available_memory():
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryRetrievalStartedEvent(
|
||||
task_id=str(task.id) if task else None,
|
||||
source_type="agent",
|
||||
),
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
contextual_memory = ContextualMemory(
|
||||
self.crew.memory_config,
|
||||
self.crew._short_term_memory,
|
||||
@@ -313,6 +327,16 @@ class Agent(BaseAgent):
|
||||
memory = contextual_memory.build_context_for_task(task, context)
|
||||
if memory.strip() != "":
|
||||
task_prompt += self.i18n.slice("memory").format(memory=memory)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryRetrievalCompletedEvent(
|
||||
task_id=str(task.id) if task else None,
|
||||
memory_content=memory,
|
||||
retrieval_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="agent",
|
||||
),
|
||||
)
|
||||
knowledge_config = (
|
||||
self.knowledge_config.model_dump() if self.knowledge_config else {}
|
||||
)
|
||||
|
||||
@@ -1,10 +1,20 @@
|
||||
from typing import Optional
|
||||
import time
|
||||
|
||||
from pydantic import PrivateAttr
|
||||
|
||||
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
|
||||
from crewai.memory.memory import Memory
|
||||
from crewai.memory.storage.rag_storage import RAGStorage
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
MemoryQueryFailedEvent,
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemorySaveFailedEvent,
|
||||
)
|
||||
|
||||
|
||||
class EntityMemory(Memory):
|
||||
@@ -48,16 +58,96 @@ class EntityMemory(Memory):
|
||||
|
||||
def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
|
||||
"""Saves an entity item into the SQLite storage."""
|
||||
if self._memory_provider == "mem0":
|
||||
data = f"""
|
||||
Remember details about the following entity:
|
||||
Name: {item.name}
|
||||
Type: {item.type}
|
||||
Entity Description: {item.description}
|
||||
"""
|
||||
else:
|
||||
data = f"{item.name}({item.type}): {item.description}"
|
||||
super().save(data, item.metadata)
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveStartedEvent(
|
||||
metadata=item.metadata,
|
||||
source_type="entity_memory",
|
||||
),
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
if self._memory_provider == "mem0":
|
||||
data = f"""
|
||||
Remember details about the following entity:
|
||||
Name: {item.name}
|
||||
Type: {item.type}
|
||||
Entity Description: {item.description}
|
||||
"""
|
||||
else:
|
||||
data = f"{item.name}({item.type}): {item.description}"
|
||||
|
||||
super().save(data, item.metadata)
|
||||
|
||||
# Emit memory save completed event
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveCompletedEvent(
|
||||
value=data,
|
||||
metadata=item.metadata,
|
||||
save_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="entity_memory",
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveFailedEvent(
|
||||
metadata=item.metadata,
|
||||
error=str(e),
|
||||
source_type="entity_memory",
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
def search(
|
||||
self,
|
||||
query: str,
|
||||
limit: int = 3,
|
||||
score_threshold: float = 0.35,
|
||||
):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryStartedEvent(
|
||||
query=query,
|
||||
limit=limit,
|
||||
score_threshold=score_threshold,
|
||||
source_type="entity_memory",
|
||||
),
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
results = super().search(
|
||||
query=query, limit=limit, score_threshold=score_threshold
|
||||
)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryCompletedEvent(
|
||||
query=query,
|
||||
results=results,
|
||||
limit=limit,
|
||||
score_threshold=score_threshold,
|
||||
query_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="entity_memory",
|
||||
),
|
||||
)
|
||||
|
||||
return results
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryFailedEvent(
|
||||
query=query,
|
||||
limit=limit,
|
||||
score_threshold=score_threshold,
|
||||
error=str(e),
|
||||
source_type="entity_memory",
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
def reset(self) -> None:
|
||||
try:
|
||||
|
||||
97
src/crewai/memory/external/external_memory.py
vendored
97
src/crewai/memory/external/external_memory.py
vendored
@@ -1,8 +1,18 @@
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional
|
||||
import time
|
||||
|
||||
from crewai.memory.external.external_memory_item import ExternalMemoryItem
|
||||
from crewai.memory.memory import Memory
|
||||
from crewai.memory.storage.interface import Storage
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
MemoryQueryFailedEvent,
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemorySaveFailedEvent,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.memory.storage.mem0_storage import Mem0Storage
|
||||
@@ -46,8 +56,91 @@ class ExternalMemory(Memory):
|
||||
agent: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Saves a value into the external storage."""
|
||||
item = ExternalMemoryItem(value=value, metadata=metadata, agent=agent)
|
||||
super().save(value=item.value, metadata=item.metadata, agent=item.agent)
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveStartedEvent(
|
||||
value=value,
|
||||
metadata=metadata,
|
||||
agent_role=agent,
|
||||
source_type="external_memory",
|
||||
),
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
item = ExternalMemoryItem(value=value, metadata=metadata, agent=agent)
|
||||
super().save(value=item.value, metadata=item.metadata, agent=item.agent)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveCompletedEvent(
|
||||
value=value,
|
||||
metadata=metadata,
|
||||
agent_role=agent,
|
||||
save_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="external_memory",
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveFailedEvent(
|
||||
value=value,
|
||||
metadata=metadata,
|
||||
agent_role=agent,
|
||||
error=str(e),
|
||||
source_type="external_memory",
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
def search(
|
||||
self,
|
||||
query: str,
|
||||
limit: int = 3,
|
||||
score_threshold: float = 0.35,
|
||||
):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryStartedEvent(
|
||||
query=query,
|
||||
limit=limit,
|
||||
score_threshold=score_threshold,
|
||||
source_type="external_memory",
|
||||
),
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
results = super().search(
|
||||
query=query, limit=limit, score_threshold=score_threshold
|
||||
)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryCompletedEvent(
|
||||
query=query,
|
||||
results=results,
|
||||
limit=limit,
|
||||
score_threshold=score_threshold,
|
||||
query_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="external_memory",
|
||||
),
|
||||
)
|
||||
|
||||
return results
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryFailedEvent(
|
||||
query=query,
|
||||
limit=limit,
|
||||
score_threshold=score_threshold,
|
||||
error=str(e),
|
||||
source_type="external_memory",
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
def reset(self) -> None:
|
||||
self.storage.reset()
|
||||
|
||||
@@ -1,7 +1,17 @@
|
||||
from typing import Any, Dict, List
|
||||
import time
|
||||
|
||||
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
|
||||
from crewai.memory.memory import Memory
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
MemoryQueryFailedEvent,
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemorySaveFailedEvent,
|
||||
)
|
||||
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
|
||||
|
||||
|
||||
@@ -20,17 +30,87 @@ class LongTermMemory(Memory):
|
||||
super().__init__(storage=storage)
|
||||
|
||||
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
|
||||
metadata = item.metadata
|
||||
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
|
||||
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
|
||||
task_description=item.task,
|
||||
score=metadata["quality"],
|
||||
metadata=metadata,
|
||||
datetime=item.datetime,
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveStartedEvent(
|
||||
value=item.task,
|
||||
metadata=item.metadata,
|
||||
agent_role=item.agent,
|
||||
source_type="long_term_memory",
|
||||
),
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
metadata = item.metadata
|
||||
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
|
||||
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
|
||||
task_description=item.task,
|
||||
score=metadata["quality"],
|
||||
metadata=metadata,
|
||||
datetime=item.datetime,
|
||||
)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveCompletedEvent(
|
||||
value=item.task,
|
||||
metadata=item.metadata,
|
||||
agent_role=item.agent,
|
||||
save_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="long_term_memory",
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveFailedEvent(
|
||||
value=item.task,
|
||||
metadata=item.metadata,
|
||||
agent_role=item.agent,
|
||||
error=str(e),
|
||||
source_type="long_term_memory",
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
def search(self, task: str, latest_n: int = 3) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
|
||||
return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryStartedEvent(
|
||||
query=task,
|
||||
limit=latest_n,
|
||||
source_type="long_term_memory",
|
||||
),
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
results = self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryCompletedEvent(
|
||||
query=task,
|
||||
results=results,
|
||||
limit=latest_n,
|
||||
query_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="long_term_memory",
|
||||
),
|
||||
)
|
||||
|
||||
return results
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryFailedEvent(
|
||||
query=task,
|
||||
limit=latest_n,
|
||||
error=str(e),
|
||||
source_type="long_term_memory",
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
def reset(self) -> None:
|
||||
self.storage.reset()
|
||||
|
||||
@@ -1,10 +1,20 @@
|
||||
from typing import Any, Dict, Optional
|
||||
import time
|
||||
|
||||
from pydantic import PrivateAttr
|
||||
|
||||
from crewai.memory.memory import Memory
|
||||
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
|
||||
from crewai.memory.storage.rag_storage import RAGStorage
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
MemoryQueryFailedEvent,
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemorySaveFailedEvent,
|
||||
)
|
||||
|
||||
|
||||
class ShortTermMemory(Memory):
|
||||
@@ -52,11 +62,46 @@ class ShortTermMemory(Memory):
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
agent: Optional[str] = None,
|
||||
) -> None:
|
||||
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
|
||||
if self._memory_provider == "mem0":
|
||||
item.data = f"Remember the following insights from Agent run: {item.data}"
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveStartedEvent(
|
||||
value=value,
|
||||
metadata=metadata,
|
||||
agent_role=agent,
|
||||
source_type="short_term_memory",
|
||||
),
|
||||
)
|
||||
|
||||
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
|
||||
start_time = time.time()
|
||||
try:
|
||||
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
|
||||
if self._memory_provider == "mem0":
|
||||
item.data = f"Remember the following insights from Agent run: {item.data}"
|
||||
|
||||
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveCompletedEvent(
|
||||
value=value,
|
||||
metadata=metadata,
|
||||
agent_role=agent,
|
||||
save_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="short_term_memory",
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveFailedEvent(
|
||||
value=value,
|
||||
metadata=metadata,
|
||||
agent_role=agent,
|
||||
error=str(e),
|
||||
source_type="short_term_memory",
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
def search(
|
||||
self,
|
||||
@@ -64,9 +109,47 @@ class ShortTermMemory(Memory):
|
||||
limit: int = 3,
|
||||
score_threshold: float = 0.35,
|
||||
):
|
||||
return self.storage.search(
|
||||
query=query, limit=limit, score_threshold=score_threshold
|
||||
) # type: ignore # BUG? The reference is to the parent class, but the parent class does not have this parameters
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryStartedEvent(
|
||||
query=query,
|
||||
limit=limit,
|
||||
score_threshold=score_threshold,
|
||||
source_type="short_term_memory",
|
||||
),
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
results = self.storage.search(
|
||||
query=query, limit=limit, score_threshold=score_threshold
|
||||
) # type: ignore # BUG? The reference is to the parent class, but the parent class does not have this parameters
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryCompletedEvent(
|
||||
query=query,
|
||||
results=results,
|
||||
limit=limit,
|
||||
score_threshold=score_threshold,
|
||||
query_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="short_term_memory",
|
||||
),
|
||||
)
|
||||
|
||||
return results
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryFailedEvent(
|
||||
query=query,
|
||||
limit=limit,
|
||||
score_threshold=score_threshold,
|
||||
error=str(e),
|
||||
source_type="short_term_memory",
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
def reset(self) -> None:
|
||||
try:
|
||||
|
||||
@@ -51,6 +51,71 @@ from .llm_events import (
|
||||
LLMStreamChunkEvent,
|
||||
)
|
||||
|
||||
from .memory_events import (
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemorySaveFailedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
MemoryQueryFailedEvent,
|
||||
MemoryRetrievalStartedEvent,
|
||||
MemoryRetrievalCompletedEvent,
|
||||
)
|
||||
|
||||
# events
|
||||
from .event_listener import EventListener
|
||||
from .third_party.agentops_listener import agentops_listener
|
||||
|
||||
__all__ = [
|
||||
"EventListener",
|
||||
"agentops_listener",
|
||||
"CrewAIEventsBus",
|
||||
"crewai_event_bus",
|
||||
"AgentExecutionStartedEvent",
|
||||
"AgentExecutionCompletedEvent",
|
||||
"AgentExecutionErrorEvent",
|
||||
"TaskStartedEvent",
|
||||
"TaskCompletedEvent",
|
||||
"TaskFailedEvent",
|
||||
"TaskEvaluationEvent",
|
||||
"FlowCreatedEvent",
|
||||
"FlowStartedEvent",
|
||||
"FlowFinishedEvent",
|
||||
"FlowPlotEvent",
|
||||
"MethodExecutionStartedEvent",
|
||||
"MethodExecutionFinishedEvent",
|
||||
"MethodExecutionFailedEvent",
|
||||
"LLMCallCompletedEvent",
|
||||
"LLMCallFailedEvent",
|
||||
"LLMCallStartedEvent",
|
||||
"LLMCallType",
|
||||
"LLMStreamChunkEvent",
|
||||
"MemorySaveStartedEvent",
|
||||
"MemorySaveCompletedEvent",
|
||||
"MemorySaveFailedEvent",
|
||||
"MemoryQueryStartedEvent",
|
||||
"MemoryQueryCompletedEvent",
|
||||
"MemoryQueryFailedEvent",
|
||||
"MemoryRetrievalStartedEvent",
|
||||
"MemoryRetrievalCompletedEvent",
|
||||
"EventListener",
|
||||
"agentops_listener",
|
||||
"CrewKickoffStartedEvent",
|
||||
"CrewKickoffCompletedEvent",
|
||||
"CrewKickoffFailedEvent",
|
||||
"CrewTrainStartedEvent",
|
||||
"CrewTrainCompletedEvent",
|
||||
"CrewTrainFailedEvent",
|
||||
"CrewTestStartedEvent",
|
||||
"CrewTestCompletedEvent",
|
||||
"CrewTestFailedEvent",
|
||||
"LLMGuardrailCompletedEvent",
|
||||
"LLMGuardrailStartedEvent",
|
||||
"ToolUsageFinishedEvent",
|
||||
"ToolUsageErrorEvent",
|
||||
"ToolUsageStartedEvent",
|
||||
"ToolExecutionErrorEvent",
|
||||
"ToolSelectionErrorEvent",
|
||||
"ToolUsageEvent",
|
||||
"ToolValidateInputErrorEvent",
|
||||
]
|
||||
|
||||
@@ -12,7 +12,7 @@ class BaseEvent(BaseModel):
|
||||
timestamp: datetime = Field(default_factory=datetime.now)
|
||||
type: str
|
||||
source_fingerprint: Optional[str] = None # UUID string of the source entity
|
||||
source_type: Optional[str] = None # "agent", "task", "crew"
|
||||
source_type: Optional[str] = None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"
|
||||
fingerprint_metadata: Optional[Dict[str, Any]] = None # Any relevant metadata
|
||||
|
||||
def to_json(self, exclude: set[str] | None = None):
|
||||
|
||||
@@ -65,6 +65,8 @@ from .reasoning_events import (
|
||||
AgentReasoningFailedEvent,
|
||||
)
|
||||
|
||||
from .listeners.memory_listener import MemoryListener
|
||||
|
||||
|
||||
class EventListener(BaseEventListener):
|
||||
_instance = None
|
||||
@@ -91,6 +93,8 @@ class EventListener(BaseEventListener):
|
||||
self._initialized = True
|
||||
self.formatter = ConsoleFormatter(verbose=True)
|
||||
|
||||
MemoryListener(formatter=self.formatter)
|
||||
|
||||
# ----------- CREW EVENTS -----------
|
||||
|
||||
def setup_listeners(self, crewai_event_bus):
|
||||
|
||||
@@ -57,6 +57,17 @@ from .knowledge_events import (
|
||||
KnowledgeSearchQueryFailedEvent,
|
||||
)
|
||||
|
||||
from .memory_events import (
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemorySaveFailedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
MemoryQueryFailedEvent,
|
||||
MemoryRetrievalStartedEvent,
|
||||
MemoryRetrievalCompletedEvent,
|
||||
)
|
||||
|
||||
EventTypes = Union[
|
||||
CrewKickoffStartedEvent,
|
||||
CrewKickoffCompletedEvent,
|
||||
@@ -96,4 +107,12 @@ EventTypes = Union[
|
||||
KnowledgeQueryCompletedEvent,
|
||||
KnowledgeQueryFailedEvent,
|
||||
KnowledgeSearchQueryFailedEvent,
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemorySaveFailedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
MemoryQueryFailedEvent,
|
||||
MemoryRetrievalStartedEvent,
|
||||
MemoryRetrievalCompletedEvent,
|
||||
]
|
||||
|
||||
110
src/crewai/utilities/events/listeners/memory_listener.py
Normal file
110
src/crewai/utilities/events/listeners/memory_listener.py
Normal file
@@ -0,0 +1,110 @@
|
||||
from crewai.utilities.events.base_event_listener import BaseEventListener
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemoryRetrievalCompletedEvent,
|
||||
MemoryRetrievalStartedEvent,
|
||||
MemoryQueryFailedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemorySaveFailedEvent,
|
||||
)
|
||||
|
||||
class MemoryListener(BaseEventListener):
|
||||
|
||||
def __init__(self, formatter):
|
||||
super().__init__()
|
||||
self.formatter = formatter
|
||||
self.memory_retrieval_in_progress = False
|
||||
self.memory_save_in_progress = False
|
||||
|
||||
def setup_listeners(self, crewai_event_bus):
|
||||
@crewai_event_bus.on(MemoryRetrievalStartedEvent)
|
||||
def on_memory_retrieval_started(
|
||||
source, event: MemoryRetrievalStartedEvent
|
||||
):
|
||||
if self.memory_retrieval_in_progress:
|
||||
return
|
||||
|
||||
self.memory_retrieval_in_progress = True
|
||||
|
||||
self.formatter.handle_memory_retrieval_started(
|
||||
self.formatter.current_agent_branch,
|
||||
self.formatter.current_crew_tree,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
|
||||
def on_memory_retrieval_completed(
|
||||
source, event: MemoryRetrievalCompletedEvent
|
||||
):
|
||||
if not self.memory_retrieval_in_progress:
|
||||
return
|
||||
|
||||
self.memory_retrieval_in_progress = False
|
||||
self.formatter.handle_memory_retrieval_completed(
|
||||
self.formatter.current_agent_branch,
|
||||
self.formatter.current_crew_tree,
|
||||
event.memory_content,
|
||||
event.retrieval_time_ms
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def on_memory_query_completed(source, event: MemoryQueryCompletedEvent):
|
||||
if not self.memory_retrieval_in_progress:
|
||||
return
|
||||
|
||||
self.formatter.handle_memory_query_completed(
|
||||
self.formatter.current_agent_branch,
|
||||
event.source_type,
|
||||
event.query_time_ms,
|
||||
self.formatter.current_crew_tree,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryFailedEvent)
|
||||
def on_memory_query_failed(source, event: MemoryQueryFailedEvent):
|
||||
if not self.memory_retrieval_in_progress:
|
||||
return
|
||||
|
||||
self.formatter.handle_memory_query_failed(
|
||||
self.formatter.current_agent_branch,
|
||||
self.formatter.current_crew_tree,
|
||||
event.error,
|
||||
event.source_type,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveStartedEvent)
|
||||
def on_memory_save_started(source, event: MemorySaveStartedEvent):
|
||||
if self.memory_save_in_progress:
|
||||
return
|
||||
|
||||
self.memory_save_in_progress = True
|
||||
|
||||
self.formatter.handle_memory_save_started(
|
||||
self.formatter.current_agent_branch,
|
||||
self.formatter.current_crew_tree,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def on_memory_save_completed(source, event: MemorySaveCompletedEvent):
|
||||
if not self.memory_save_in_progress:
|
||||
return
|
||||
|
||||
self.memory_save_in_progress = False
|
||||
|
||||
self.formatter.handle_memory_save_completed(
|
||||
self.formatter.current_agent_branch,
|
||||
self.formatter.current_crew_tree,
|
||||
event.save_time_ms,
|
||||
event.source_type,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveFailedEvent)
|
||||
def on_memory_save_failed(source, event: MemorySaveFailedEvent):
|
||||
if not self.memory_save_in_progress:
|
||||
return
|
||||
|
||||
self.formatter.handle_memory_save_failed(
|
||||
self.formatter.current_agent_branch,
|
||||
event.error,
|
||||
event.source_type,
|
||||
self.formatter.current_crew_tree,
|
||||
)
|
||||
78
src/crewai/utilities/events/memory_events.py
Normal file
78
src/crewai/utilities/events/memory_events.py
Normal file
@@ -0,0 +1,78 @@
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from crewai.utilities.events.base_events import BaseEvent
|
||||
|
||||
|
||||
class MemoryQueryStartedEvent(BaseEvent):
|
||||
"""Event emitted when a memory query is started"""
|
||||
|
||||
type: str = "memory_query_started"
|
||||
query: str
|
||||
limit: int
|
||||
score_threshold: Optional[float] = None
|
||||
|
||||
|
||||
class MemoryQueryCompletedEvent(BaseEvent):
|
||||
"""Event emitted when a memory query is completed successfully"""
|
||||
|
||||
type: str = "memory_query_completed"
|
||||
query: str
|
||||
results: Any
|
||||
limit: int
|
||||
score_threshold: Optional[float] = None
|
||||
query_time_ms: float
|
||||
|
||||
|
||||
class MemoryQueryFailedEvent(BaseEvent):
|
||||
"""Event emitted when a memory query fails"""
|
||||
|
||||
type: str = "memory_query_failed"
|
||||
query: str
|
||||
limit: int
|
||||
score_threshold: Optional[float] = None
|
||||
error: str
|
||||
|
||||
|
||||
class MemorySaveStartedEvent(BaseEvent):
|
||||
"""Event emitted when a memory save operation is started"""
|
||||
|
||||
type: str = "memory_save_started"
|
||||
value: Optional[str] = None
|
||||
metadata: Optional[Dict[str, Any]] = None
|
||||
agent_role: Optional[str] = None
|
||||
|
||||
|
||||
class MemorySaveCompletedEvent(BaseEvent):
|
||||
"""Event emitted when a memory save operation is completed successfully"""
|
||||
|
||||
type: str = "memory_save_completed"
|
||||
value: str
|
||||
metadata: Optional[Dict[str, Any]] = None
|
||||
agent_role: Optional[str] = None
|
||||
save_time_ms: float
|
||||
|
||||
|
||||
class MemorySaveFailedEvent(BaseEvent):
|
||||
"""Event emitted when a memory save operation fails"""
|
||||
|
||||
type: str = "memory_save_failed"
|
||||
value: Optional[str] = None
|
||||
metadata: Optional[Dict[str, Any]] = None
|
||||
agent_role: Optional[str] = None
|
||||
error: str
|
||||
|
||||
|
||||
class MemoryRetrievalStartedEvent(BaseEvent):
|
||||
"""Event emitted when memory retrieval for a task prompt starts"""
|
||||
|
||||
type: str = "memory_retrieval_started"
|
||||
task_id: Optional[str] = None
|
||||
|
||||
|
||||
class MemoryRetrievalCompletedEvent(BaseEvent):
|
||||
"""Event emitted when memory retrieval for a task prompt completes successfully"""
|
||||
|
||||
type: str = "memory_retrieval_completed"
|
||||
task_id: Optional[str] = None
|
||||
memory_content: str
|
||||
retrieval_time_ms: float
|
||||
@@ -1454,3 +1454,250 @@ class ConsoleFormatter:
|
||||
)
|
||||
self.print(finish_panel)
|
||||
self.print()
|
||||
|
||||
def handle_memory_retrieval_started(
|
||||
self,
|
||||
agent_branch: Optional[Tree],
|
||||
crew_tree: Optional[Tree],
|
||||
) -> Optional[Tree]:
|
||||
if not self.verbose:
|
||||
return None
|
||||
|
||||
branch_to_use = agent_branch or self.current_lite_agent_branch
|
||||
tree_to_use = branch_to_use or crew_tree
|
||||
|
||||
if branch_to_use is None or tree_to_use is None:
|
||||
if crew_tree is not None:
|
||||
branch_to_use = tree_to_use = crew_tree
|
||||
else:
|
||||
return None
|
||||
|
||||
memory_branch = branch_to_use.add("")
|
||||
self.update_tree_label(
|
||||
memory_branch, "🧠", "Memory Retrieval Started", "blue"
|
||||
)
|
||||
|
||||
self.print(tree_to_use)
|
||||
self.print()
|
||||
return memory_branch
|
||||
|
||||
def handle_memory_retrieval_completed(
|
||||
self,
|
||||
agent_branch: Optional[Tree],
|
||||
crew_tree: Optional[Tree],
|
||||
memory_content: str,
|
||||
retrieval_time_ms: float,
|
||||
) -> None:
|
||||
if not self.verbose:
|
||||
return None
|
||||
|
||||
branch_to_use = self.current_lite_agent_branch or agent_branch
|
||||
tree_to_use = branch_to_use or crew_tree
|
||||
|
||||
if branch_to_use is None and tree_to_use is not None:
|
||||
branch_to_use = tree_to_use
|
||||
|
||||
def add_panel():
|
||||
memory_text = str(memory_content)
|
||||
if len(memory_text) > 500:
|
||||
memory_text = memory_text[:497] + "..."
|
||||
|
||||
memory_panel = Panel(
|
||||
Text(memory_text, style="white"),
|
||||
title="🧠 Retrieved Memory",
|
||||
subtitle=f"Retrieval Time: {retrieval_time_ms:.2f}ms",
|
||||
border_style="green",
|
||||
padding=(1, 2),
|
||||
)
|
||||
self.print(memory_panel)
|
||||
self.print()
|
||||
|
||||
if branch_to_use is None or tree_to_use is None:
|
||||
add_panel()
|
||||
return None
|
||||
|
||||
memory_branch_found = False
|
||||
for child in branch_to_use.children:
|
||||
if "Memory Retrieval Started" in str(child.label):
|
||||
self.update_tree_label(
|
||||
child, "✅", "Memory Retrieval Completed", "green"
|
||||
)
|
||||
memory_branch_found = True
|
||||
break
|
||||
|
||||
if not memory_branch_found:
|
||||
for child in branch_to_use.children:
|
||||
if (
|
||||
"Memory Retrieval" in str(child.label)
|
||||
and "Started" not in str(child.label)
|
||||
and "Completed" not in str(child.label)
|
||||
):
|
||||
self.update_tree_label(
|
||||
child, "✅", "Memory Retrieval Completed", "green"
|
||||
)
|
||||
memory_branch_found = True
|
||||
break
|
||||
|
||||
if not memory_branch_found:
|
||||
memory_branch = branch_to_use.add("")
|
||||
self.update_tree_label(
|
||||
memory_branch, "✅", "Memory Retrieval Completed", "green"
|
||||
)
|
||||
|
||||
self.print(tree_to_use)
|
||||
|
||||
if memory_content:
|
||||
add_panel()
|
||||
|
||||
|
||||
def handle_memory_query_completed(
|
||||
self,
|
||||
agent_branch: Optional[Tree],
|
||||
source_type: str,
|
||||
query_time_ms: float,
|
||||
crew_tree: Optional[Tree],
|
||||
) -> None:
|
||||
if not self.verbose:
|
||||
return None
|
||||
|
||||
branch_to_use = self.current_lite_agent_branch or agent_branch
|
||||
tree_to_use = branch_to_use or crew_tree
|
||||
|
||||
if branch_to_use is None and tree_to_use is not None:
|
||||
branch_to_use = tree_to_use
|
||||
|
||||
if branch_to_use is None:
|
||||
return None
|
||||
|
||||
memory_type = source_type.replace("_", " ").title()
|
||||
|
||||
for child in branch_to_use.children:
|
||||
if "Memory Retrieval" in str(child.label):
|
||||
for child in child.children:
|
||||
sources_branch = child
|
||||
if "Sources Used" in str(child.label):
|
||||
sources_branch.add(f"✅ {memory_type} ({query_time_ms:.2f}ms)")
|
||||
break
|
||||
else:
|
||||
sources_branch = child.add("Sources Used")
|
||||
sources_branch.add(f"✅ {memory_type} ({query_time_ms:.2f}ms)")
|
||||
break
|
||||
|
||||
def handle_memory_query_failed(
|
||||
self,
|
||||
agent_branch: Optional[Tree],
|
||||
crew_tree: Optional[Tree],
|
||||
error: str,
|
||||
source_type: str,
|
||||
) -> None:
|
||||
if not self.verbose:
|
||||
return None
|
||||
|
||||
branch_to_use = self.current_lite_agent_branch or agent_branch
|
||||
tree_to_use = branch_to_use or crew_tree
|
||||
|
||||
if branch_to_use is None and tree_to_use is not None:
|
||||
branch_to_use = tree_to_use
|
||||
|
||||
if branch_to_use is None:
|
||||
return None
|
||||
|
||||
memory_type = source_type.replace("_", " ").title()
|
||||
|
||||
for child in branch_to_use.children:
|
||||
if "Memory Retrieval" in str(child.label):
|
||||
for child in child.children:
|
||||
sources_branch = child
|
||||
if "Sources Used" in str(child.label):
|
||||
sources_branch.add(f"❌ {memory_type} - Error: {error}")
|
||||
break
|
||||
else:
|
||||
sources_branch = child.add("🧠 Sources Used")
|
||||
sources_branch.add(f"❌ {memory_type} - Error: {error}")
|
||||
break
|
||||
|
||||
|
||||
def handle_memory_save_started(
|
||||
self,
|
||||
agent_branch: Optional[Tree],
|
||||
crew_tree: Optional[Tree]
|
||||
) -> None:
|
||||
if not self.verbose:
|
||||
return None
|
||||
|
||||
branch_to_use = agent_branch or self.current_lite_agent_branch
|
||||
tree_to_use = branch_to_use or crew_tree
|
||||
|
||||
if tree_to_use is None:
|
||||
return None
|
||||
|
||||
for child in tree_to_use.children:
|
||||
if "Memory Update" in str(child.label):
|
||||
break
|
||||
else:
|
||||
memory_branch = tree_to_use.add("")
|
||||
self.update_tree_label(
|
||||
memory_branch, "🧠", "Memory Update Overall", "white"
|
||||
)
|
||||
|
||||
self.print(tree_to_use)
|
||||
self.print()
|
||||
|
||||
def handle_memory_save_completed(
|
||||
self,
|
||||
agent_branch: Optional[Tree],
|
||||
crew_tree: Optional[Tree],
|
||||
save_time_ms: float,
|
||||
source_type: str,
|
||||
) -> None:
|
||||
if not self.verbose:
|
||||
return None
|
||||
|
||||
branch_to_use = agent_branch or self.current_lite_agent_branch
|
||||
tree_to_use = branch_to_use or crew_tree
|
||||
|
||||
if tree_to_use is None:
|
||||
return None
|
||||
|
||||
memory_type = source_type.replace("_", " ").title()
|
||||
content = f"✅ {memory_type} Memory Saved ({save_time_ms:.2f}ms)"
|
||||
|
||||
for child in tree_to_use.children:
|
||||
if "Memory Update" in str(child.label):
|
||||
child.add(content)
|
||||
break
|
||||
else:
|
||||
memory_branch = tree_to_use.add("")
|
||||
memory_branch.add(content)
|
||||
|
||||
self.print(tree_to_use)
|
||||
self.print()
|
||||
|
||||
def handle_memory_save_failed(
|
||||
self,
|
||||
agent_branch: Optional[Tree],
|
||||
error: str,
|
||||
source_type: str,
|
||||
crew_tree: Optional[Tree],
|
||||
) -> None:
|
||||
if not self.verbose:
|
||||
return None
|
||||
|
||||
branch_to_use = agent_branch or self.current_lite_agent_branch
|
||||
tree_to_use = branch_to_use or crew_tree
|
||||
|
||||
if branch_to_use is None or tree_to_use is None:
|
||||
return None
|
||||
|
||||
memory_type = source_type.replace("_", " ").title()
|
||||
content = f"❌ {memory_type} Memory Save Failed"
|
||||
for child in branch_to_use.children:
|
||||
if "Memory Update" in str(child.label):
|
||||
child.add(content)
|
||||
break
|
||||
else:
|
||||
memory_branch = branch_to_use.add("")
|
||||
memory_branch.add(content)
|
||||
|
||||
self.print(tree_to_use)
|
||||
self.print()
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
1969
tests/cassettes/test_memory_events_are_emitted.yaml
Normal file
1969
tests/cassettes/test_memory_events_are_emitted.yaml
Normal file
File diff suppressed because one or more lines are too long
@@ -5,6 +5,7 @@ import json
|
||||
from concurrent.futures import Future
|
||||
from unittest import mock
|
||||
from unittest.mock import ANY, MagicMock, patch
|
||||
from collections import defaultdict
|
||||
|
||||
import pydantic_core
|
||||
import pytest
|
||||
@@ -40,6 +41,16 @@ from crewai.utilities.events.event_listener import EventListener
|
||||
from crewai.utilities.rpm_controller import RPMController
|
||||
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
|
||||
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemorySaveFailedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
MemoryQueryFailedEvent,
|
||||
MemoryRetrievalStartedEvent,
|
||||
MemoryRetrievalCompletedEvent,
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def ceo():
|
||||
@@ -2478,11 +2489,79 @@ def test_using_contextual_memory():
|
||||
memory=True,
|
||||
)
|
||||
|
||||
with patch.object(ContextualMemory, "build_context_for_task") as contextual_mem:
|
||||
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
|
||||
crew.kickoff()
|
||||
contextual_mem.assert_called_once()
|
||||
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_memory_events_are_emitted():
|
||||
events = defaultdict(list)
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(MemorySaveStartedEvent)
|
||||
def handle_memory_save_started(source, event):
|
||||
events["MemorySaveStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def handle_memory_save_completed(source, event):
|
||||
events["MemorySaveCompletedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveFailedEvent)
|
||||
def handle_memory_save_failed(source, event):
|
||||
events["MemorySaveFailedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryStartedEvent)
|
||||
def handle_memory_query_started(source, event):
|
||||
events["MemoryQueryStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def handle_memory_query_completed(source, event):
|
||||
events["MemoryQueryCompletedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryFailedEvent)
|
||||
def handle_memory_query_failed(source, event):
|
||||
events["MemoryQueryFailedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryRetrievalStartedEvent)
|
||||
def handle_memory_retrieval_started(source, event):
|
||||
events["MemoryRetrievalStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
|
||||
def handle_memory_retrieval_completed(source, event):
|
||||
events["MemoryRetrievalCompletedEvent"].append(event)
|
||||
|
||||
math_researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="You research about math.",
|
||||
backstory="You're an expert in research and you love to learn new things.",
|
||||
allow_delegation=False,
|
||||
)
|
||||
|
||||
task1 = Task(
|
||||
description="Research a topic to teach a kid aged 6 about math.",
|
||||
expected_output="A topic, explanation, angle, and examples.",
|
||||
agent=math_researcher,
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[math_researcher],
|
||||
tasks=[task1],
|
||||
memory=True,
|
||||
)
|
||||
|
||||
crew.kickoff()
|
||||
|
||||
assert len(events["MemorySaveStartedEvent"]) == 6
|
||||
assert len(events["MemorySaveCompletedEvent"]) == 6
|
||||
assert len(events["MemorySaveFailedEvent"]) == 0
|
||||
assert len(events["MemoryQueryStartedEvent"]) == 3
|
||||
assert len(events["MemoryQueryCompletedEvent"]) == 3
|
||||
assert len(events["MemoryQueryFailedEvent"]) == 0
|
||||
assert len(events["MemoryRetrievalStartedEvent"]) == 1
|
||||
assert len(events["MemoryRetrievalCompletedEvent"]) == 1
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_using_contextual_memory_with_long_term_memory():
|
||||
from unittest.mock import patch
|
||||
@@ -2506,7 +2585,7 @@ def test_using_contextual_memory_with_long_term_memory():
|
||||
long_term_memory=LongTermMemory(),
|
||||
)
|
||||
|
||||
with patch.object(ContextualMemory, "build_context_for_task") as contextual_mem:
|
||||
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
|
||||
crew.kickoff()
|
||||
contextual_mem.assert_called_once()
|
||||
assert crew.memory is False
|
||||
@@ -2607,7 +2686,7 @@ def test_using_contextual_memory_with_short_term_memory():
|
||||
short_term_memory=ShortTermMemory(),
|
||||
)
|
||||
|
||||
with patch.object(ContextualMemory, "build_context_for_task") as contextual_mem:
|
||||
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
|
||||
crew.kickoff()
|
||||
contextual_mem.assert_called_once()
|
||||
assert crew.memory is False
|
||||
@@ -2636,7 +2715,7 @@ def test_disabled_memory_using_contextual_memory():
|
||||
memory=False,
|
||||
)
|
||||
|
||||
with patch.object(ContextualMemory, "build_context_for_task") as contextual_mem:
|
||||
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
|
||||
crew.kickoff()
|
||||
contextual_mem.assert_not_called()
|
||||
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from unittest.mock import MagicMock, patch, ANY
|
||||
from collections import defaultdict
|
||||
from crewai.utilities.events import crewai_event_bus
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
)
|
||||
import pytest
|
||||
from mem0.memory.main import Memory
|
||||
|
||||
@@ -10,7 +17,6 @@ from crewai.memory.external.external_memory_item import ExternalMemoryItem
|
||||
from crewai.memory.storage.interface import Storage
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_mem0_memory():
|
||||
mock_memory = MagicMock(spec=Memory)
|
||||
@@ -188,7 +194,8 @@ def test_crew_external_memory_save_using_crew_without_memory_flag(
|
||||
assert mock_method.call_count > 0
|
||||
|
||||
|
||||
def test_external_memory_custom_storage(crew_with_external_memory):
|
||||
@pytest.fixture
|
||||
def custom_storage():
|
||||
class CustomStorage(Storage):
|
||||
def __init__(self):
|
||||
self.memories = []
|
||||
@@ -203,6 +210,9 @@ def test_external_memory_custom_storage(crew_with_external_memory):
|
||||
self.memories = []
|
||||
|
||||
custom_storage = CustomStorage()
|
||||
return custom_storage
|
||||
|
||||
def test_external_memory_custom_storage(custom_storage, crew_with_external_memory):
|
||||
external_memory = ExternalMemory(storage=custom_storage)
|
||||
|
||||
# by ensuring the crew is set, we can test that the storage is used
|
||||
@@ -221,3 +231,101 @@ def test_external_memory_custom_storage(crew_with_external_memory):
|
||||
external_memory.reset()
|
||||
results = external_memory.search("test")
|
||||
assert len(results) == 0
|
||||
|
||||
|
||||
|
||||
def test_external_memory_search_events(custom_storage, external_memory_with_mocked_config):
|
||||
events = defaultdict(list)
|
||||
|
||||
external_memory_with_mocked_config.storage = custom_storage
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(MemoryQueryStartedEvent)
|
||||
def on_search_started(source, event):
|
||||
events["MemoryQueryStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def on_search_completed(source, event):
|
||||
events["MemoryQueryCompletedEvent"].append(event)
|
||||
|
||||
external_memory_with_mocked_config.search(
|
||||
query="test value",
|
||||
limit=3,
|
||||
score_threshold=0.35,
|
||||
)
|
||||
|
||||
assert len(events["MemoryQueryStartedEvent"]) == 1
|
||||
assert len(events["MemoryQueryCompletedEvent"]) == 1
|
||||
assert len(events["MemoryQueryFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemoryQueryStartedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_started',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'external_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test value',
|
||||
'limit': 3,
|
||||
'score_threshold': 0.35
|
||||
}
|
||||
|
||||
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_completed',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'external_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test value',
|
||||
'results': [],
|
||||
'limit': 3,
|
||||
'score_threshold': 0.35,
|
||||
'query_time_ms': ANY
|
||||
}
|
||||
|
||||
|
||||
|
||||
def test_external_memory_save_events(custom_storage, external_memory_with_mocked_config):
|
||||
events = defaultdict(list)
|
||||
|
||||
external_memory_with_mocked_config.storage = custom_storage
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(MemorySaveStartedEvent)
|
||||
def on_save_started(source, event):
|
||||
events["MemorySaveStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def on_save_completed(source, event):
|
||||
events["MemorySaveCompletedEvent"].append(event)
|
||||
|
||||
external_memory_with_mocked_config.save(
|
||||
value="saving value",
|
||||
metadata={"task": "test_task"},
|
||||
agent="test_agent",
|
||||
)
|
||||
|
||||
assert len(events["MemorySaveStartedEvent"]) == 1
|
||||
assert len(events["MemorySaveCompletedEvent"]) == 1
|
||||
assert len(events["MemorySaveFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemorySaveStartedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_save_started',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'external_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'value': 'saving value',
|
||||
'metadata': {'task': 'test_task'},
|
||||
'agent_role': "test_agent"
|
||||
}
|
||||
|
||||
assert dict(events["MemorySaveCompletedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_save_completed',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'external_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'value': 'saving value',
|
||||
'metadata': {'task': 'test_task', 'agent': 'test_agent'},
|
||||
'agent_role': "test_agent",
|
||||
'save_time_ms': ANY
|
||||
}
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
import pytest
|
||||
|
||||
from unittest.mock import ANY
|
||||
from collections import defaultdict
|
||||
from crewai.utilities.events import crewai_event_bus
|
||||
from crewai.memory.long_term.long_term_memory import LongTermMemory
|
||||
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
|
||||
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def long_term_memory():
|
||||
@@ -10,6 +17,103 @@ def long_term_memory():
|
||||
return LongTermMemory()
|
||||
|
||||
|
||||
def test_long_term_memory_save_events(long_term_memory):
|
||||
events = defaultdict(list)
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(MemorySaveStartedEvent)
|
||||
def on_save_started(source, event):
|
||||
events["MemorySaveStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def on_save_completed(source, event):
|
||||
events["MemorySaveCompletedEvent"].append(event)
|
||||
|
||||
memory = LongTermMemoryItem(
|
||||
agent="test_agent",
|
||||
task="test_task",
|
||||
expected_output="test_output",
|
||||
datetime="test_datetime",
|
||||
quality=0.5,
|
||||
metadata={"task": "test_task", "quality": 0.5},
|
||||
)
|
||||
long_term_memory.save(memory)
|
||||
|
||||
assert len(events["MemorySaveStartedEvent"]) == 1
|
||||
assert len(events["MemorySaveCompletedEvent"]) == 1
|
||||
assert len(events["MemorySaveFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemorySaveStartedEvent"][0]) == {
|
||||
"timestamp": ANY,
|
||||
"type": "memory_save_started",
|
||||
"source_fingerprint": None,
|
||||
"source_type": "long_term_memory",
|
||||
"fingerprint_metadata": None,
|
||||
"value": "test_task",
|
||||
"metadata": {"task": "test_task", "quality": 0.5},
|
||||
"agent_role": "test_agent",
|
||||
}
|
||||
assert dict(events["MemorySaveCompletedEvent"][0]) == {
|
||||
"timestamp": ANY,
|
||||
"type": "memory_save_completed",
|
||||
"source_fingerprint": None,
|
||||
"source_type": "long_term_memory",
|
||||
"fingerprint_metadata": None,
|
||||
"value": "test_task",
|
||||
"metadata": {"task": "test_task", "quality": 0.5, "agent": "test_agent", "expected_output": "test_output"},
|
||||
"agent_role": "test_agent",
|
||||
"save_time_ms": ANY,
|
||||
}
|
||||
|
||||
|
||||
def test_long_term_memory_search_events(long_term_memory):
|
||||
events = defaultdict(list)
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(MemoryQueryStartedEvent)
|
||||
def on_search_started(source, event):
|
||||
events["MemoryQueryStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def on_search_completed(source, event):
|
||||
events["MemoryQueryCompletedEvent"].append(event)
|
||||
|
||||
test_query = "test query"
|
||||
|
||||
long_term_memory.search(
|
||||
test_query,
|
||||
latest_n=5
|
||||
)
|
||||
|
||||
assert len(events["MemoryQueryStartedEvent"]) == 1
|
||||
assert len(events["MemoryQueryCompletedEvent"]) == 1
|
||||
assert len(events["MemoryQueryFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemoryQueryStartedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_started',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'long_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test query',
|
||||
'limit': 5,
|
||||
'score_threshold': None
|
||||
}
|
||||
|
||||
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_completed',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'long_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test query',
|
||||
'results': None,
|
||||
'limit': 5,
|
||||
'score_threshold': None,
|
||||
'query_time_ms': ANY
|
||||
}
|
||||
|
||||
|
||||
def test_save_and_search(long_term_memory):
|
||||
memory = LongTermMemoryItem(
|
||||
agent="test_agent",
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
from unittest.mock import patch, ANY
|
||||
from collections import defaultdict
|
||||
import pytest
|
||||
|
||||
from crewai.agent import Agent
|
||||
@@ -7,6 +7,13 @@ from crewai.crew import Crew
|
||||
from crewai.memory.short_term.short_term_memory import ShortTermMemory
|
||||
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
|
||||
from crewai.task import Task
|
||||
from crewai.utilities.events import crewai_event_bus
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -28,6 +35,98 @@ def short_term_memory():
|
||||
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
|
||||
|
||||
|
||||
def test_short_term_memory_search_events(short_term_memory):
|
||||
events = defaultdict(list)
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(MemoryQueryStartedEvent)
|
||||
def on_search_started(source, event):
|
||||
events["MemoryQueryStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def on_search_completed(source, event):
|
||||
events["MemoryQueryCompletedEvent"].append(event)
|
||||
|
||||
# Call the save method
|
||||
short_term_memory.search(
|
||||
query="test value",
|
||||
limit=3,
|
||||
score_threshold=0.35,
|
||||
)
|
||||
|
||||
assert len(events["MemoryQueryStartedEvent"]) == 1
|
||||
assert len(events["MemoryQueryCompletedEvent"]) == 1
|
||||
assert len(events["MemoryQueryFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemoryQueryStartedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_started',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'short_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test value',
|
||||
'limit': 3,
|
||||
'score_threshold': 0.35
|
||||
}
|
||||
|
||||
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_completed',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'short_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test value',
|
||||
'results': [],
|
||||
'limit': 3,
|
||||
'score_threshold': 0.35,
|
||||
'query_time_ms': ANY
|
||||
}
|
||||
|
||||
|
||||
def test_short_term_memory_save_events(short_term_memory):
|
||||
events = defaultdict(list)
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(MemorySaveStartedEvent)
|
||||
def on_save_started(source, event):
|
||||
events["MemorySaveStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def on_save_completed(source, event):
|
||||
events["MemorySaveCompletedEvent"].append(event)
|
||||
|
||||
short_term_memory.save(
|
||||
value="test value",
|
||||
metadata={"task": "test_task"},
|
||||
agent="test_agent",
|
||||
)
|
||||
|
||||
assert len(events["MemorySaveStartedEvent"]) == 1
|
||||
assert len(events["MemorySaveCompletedEvent"]) == 1
|
||||
assert len(events["MemorySaveFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemorySaveStartedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_save_started',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'short_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'value': 'test value',
|
||||
'metadata': {'task': 'test_task'},
|
||||
'agent_role': "test_agent"
|
||||
}
|
||||
|
||||
assert dict(events["MemorySaveCompletedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_save_completed',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'short_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'value': 'test value',
|
||||
'metadata': {'task': 'test_task', 'agent': 'test_agent'},
|
||||
'agent_role': "test_agent",
|
||||
'save_time_ms': ANY
|
||||
}
|
||||
|
||||
def test_save_and_search(short_term_memory):
|
||||
memory = ShortTermMemoryItem(
|
||||
data="""test value test value test value test value test value test value
|
||||
|
||||
Reference in New Issue
Block a user