Compare commits

...

2 Commits

Author SHA1 Message Date
Lucas Gomide
df4754301a docs: add docs about Memory Events 2025-07-02 12:04:17 -03:00
Lucas Gomide
ae57e5723c feat: add console logging for memory system usage (#3103) 2025-07-02 11:00:26 -04:00
5 changed files with 581 additions and 15 deletions

View File

@@ -255,6 +255,17 @@ CrewAI provides a wide range of events that you can listen for:
- **LLMCallFailedEvent**: Emitted when an LLM call fails
- **LLMStreamChunkEvent**: Emitted for each chunk received during streaming LLM responses
### Memory Events
- **MemoryQueryStartedEvent**: Emitted when a memory query is started. Contains the query, limit, and optional score threshold.
- **MemoryQueryCompletedEvent**: Emitted when a memory query is completed successfully. Contains the query, results, limit, score threshold, and query execution time.
- **MemoryQueryFailedEvent**: Emitted when a memory query fails. Contains the query, limit, score threshold, and error message.
- **MemorySaveStartedEvent**: Emitted when a memory save operation is started. Contains the value to be saved, metadata, and optional agent role.
- **MemorySaveCompletedEvent**: Emitted when a memory save operation is completed successfully. Contains the saved value, metadata, agent role, and save execution time.
- **MemorySaveFailedEvent**: Emitted when a memory save operation fails. Contains the value, metadata, agent role, and error message.
- **MemoryRetrievalStartedEvent**: Emitted when memory retrieval for a task prompt starts. Contains the optional task ID.
- **MemoryRetrievalCompletedEvent**: Emitted when memory retrieval for a task prompt completes successfully. Contains the task ID, memory content, and retrieval execution time.
## Event Handler Structure
Each event handler receives two parameters:

View File

@@ -9,7 +9,7 @@ icon: database
The CrewAI framework provides a sophisticated memory system designed to significantly enhance AI agent capabilities. CrewAI offers **three distinct memory approaches** that serve different use cases:
1. **Basic Memory System** - Built-in short-term, long-term, and entity memory
2. **User Memory** - User-specific memory with Mem0 integration (legacy approach)
2. **User Memory** - User-specific memory with Mem0 integration (legacy approach)
3. **External Memory** - Standalone external memory providers (new approach)
## Memory System Components
@@ -62,7 +62,7 @@ By default, CrewAI uses the `appdirs` library to determine storage locations fol
```
~/Library/Application Support/CrewAI/{project_name}/
├── knowledge/ # Knowledge base ChromaDB files
├── short_term_memory/ # Short-term memory ChromaDB files
├── short_term_memory/ # Short-term memory ChromaDB files
├── long_term_memory/ # Long-term memory ChromaDB files
├── entities/ # Entity memory ChromaDB files
└── long_term_memory_storage.db # SQLite database
@@ -252,7 +252,7 @@ chroma_path = os.path.join(storage_path, "knowledge")
if os.path.exists(chroma_path):
client = chromadb.PersistentClient(path=chroma_path)
collections = client.list_collections()
print("ChromaDB Collections:")
for collection in collections:
print(f" - {collection.name}: {collection.count()} documents")
@@ -269,7 +269,7 @@ crew = Crew(agents=[...], tasks=[...], memory=True)
# Reset specific memory types
crew.reset_memories(command_type='short') # Short-term memory
crew.reset_memories(command_type='long') # Long-term memory
crew.reset_memories(command_type='long') # Long-term memory
crew.reset_memories(command_type='entity') # Entity memory
crew.reset_memories(command_type='knowledge') # Knowledge storage
```
@@ -596,7 +596,7 @@ providers_to_test = [
{
"name": "Ollama",
"config": {
"provider": "ollama",
"provider": "ollama",
"config": {"model": "mxbai-embed-large"}
}
}
@@ -604,7 +604,7 @@ providers_to_test = [
for provider in providers_to_test:
print(f"\nTesting {provider['name']} embeddings...")
# Create crew with specific embedder
crew = Crew(
agents=[...],
@@ -612,7 +612,7 @@ for provider in providers_to_test:
memory=True,
embedder=provider['config']
)
# Run your test and measure performance
result = crew.kickoff()
print(f"{provider['name']} completed successfully")
@@ -655,17 +655,17 @@ import time
def test_embedding_performance(embedder_config, test_text="This is a test document"):
start_time = time.time()
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
embedder=embedder_config
)
# Simulate memory operation
crew.kickoff()
end_time = time.time()
return end_time - start_time
@@ -676,7 +676,7 @@ openai_time = test_embedding_performance({
})
ollama_time = test_embedding_performance({
"provider": "ollama",
"provider": "ollama",
"config": {"model": "mxbai-embed-large"}
})
@@ -783,7 +783,7 @@ os.environ["MEM0_API_KEY"] = "your-api-key"
# Create external memory instance
external_memory = ExternalMemory(
embedder_config={
"provider": "mem0",
"provider": "mem0",
"config": {"user_id": "U-123"}
}
)
@@ -808,8 +808,8 @@ class CustomStorage(Storage):
def save(self, value, metadata=None, agent=None):
self.memories.append({
"value": value,
"metadata": metadata,
"value": value,
"metadata": metadata,
"agent": agent
})
@@ -986,7 +986,201 @@ crew = Crew(
- 🫡 **Enhanced Personalization:** Memory enables agents to remember user preferences and historical interactions, leading to personalized experiences.
- 🧠 **Improved Problem Solving:** Access to a rich memory store aids agents in making more informed decisions, drawing on past learnings and contextual insights.
## Memory Events
CrewAI's event system provides powerful insights into memory operations. By leveraging memory events, you can monitor, debug, and optimize your memory system's performance and behavior.
### Available Memory Events
CrewAI emits the following memory-related events:
| Event | Description | Key Properties |
| :---- | :---------- | :------------- |
| **MemoryQueryStartedEvent** | Emitted when a memory query begins | `query`, `limit`, `score_threshold` |
| **MemoryQueryCompletedEvent** | Emitted when a memory query completes successfully | `query`, `results`, `limit`, `score_threshold`, `query_time_ms` |
| **MemoryQueryFailedEvent** | Emitted when a memory query fails | `query`, `limit`, `score_threshold`, `error` |
| **MemorySaveStartedEvent** | Emitted when a memory save operation begins | `value`, `metadata`, `agent_role` |
| **MemorySaveCompletedEvent** | Emitted when a memory save operation completes successfully | `value`, `metadata`, `agent_role`, `save_time_ms` |
| **MemorySaveFailedEvent** | Emitted when a memory save operation fails | `value`, `metadata`, `agent_role`, `error` |
| **MemoryRetrievalStartedEvent** | Emitted when memory retrieval for a task prompt starts | `task_id` |
| **MemoryRetrievalCompletedEvent** | Emitted when memory retrieval completes successfully | `task_id`, `memory_content`, `retrieval_time_ms` |
### Practical Applications
#### 1. Memory Performance Monitoring
Track memory operation timing to optimize your application:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent
)
import time
class MemoryPerformanceMonitor(BaseEventListener):
def __init__(self):
super().__init__()
self.query_times = []
self.save_times = []
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def on_memory_query_completed(source, event: MemoryQueryCompletedEvent):
self.query_times.append(event.query_time_ms)
print(f"Memory query completed in {event.query_time_ms:.2f}ms. Query: '{event.query}'")
print(f"Average query time: {sum(self.query_times)/len(self.query_times):.2f}ms")
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(source, event: MemorySaveCompletedEvent):
self.save_times.append(event.save_time_ms)
print(f"Memory save completed in {event.save_time_ms:.2f}ms")
print(f"Average save time: {sum(self.save_times)/len(self.save_times):.2f}ms")
# Create an instance of your listener
memory_monitor = MemoryPerformanceMonitor()
```
#### 2. Memory Content Logging
Log memory operations for debugging and insights:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
MemorySaveStartedEvent,
MemoryQueryStartedEvent,
MemoryRetrievalCompletedEvent
)
import logging
# Configure logging
logger = logging.getLogger('memory_events')
class MemoryLogger(BaseEventListener):
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_memory_save_started(source, event: MemorySaveStartedEvent):
if event.agent_role:
logger.info(f"Agent '{event.agent_role}' saving memory: {event.value[:50]}...")
else:
logger.info(f"Saving memory: {event.value[:50]}...")
@crewai_event_bus.on(MemoryQueryStartedEvent)
def on_memory_query_started(source, event: MemoryQueryStartedEvent):
logger.info(f"Memory query started: '{event.query}' (limit: {event.limit})")
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
def on_memory_retrieval_completed(source, event: MemoryRetrievalCompletedEvent):
if event.task_id:
logger.info(f"Memory retrieved for task {event.task_id} in {event.retrieval_time_ms:.2f}ms")
else:
logger.info(f"Memory retrieved in {event.retrieval_time_ms:.2f}ms")
logger.debug(f"Memory content: {event.memory_content}")
# Create an instance of your listener
memory_logger = MemoryLogger()
```
#### 3. Error Tracking and Notifications
Capture and respond to memory errors:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
MemorySaveFailedEvent,
MemoryQueryFailedEvent
)
import logging
from typing import Optional
# Configure logging
logger = logging.getLogger('memory_errors')
class MemoryErrorTracker(BaseEventListener):
def __init__(self, notify_email: Optional[str] = None):
super().__init__()
self.notify_email = notify_email
self.error_count = 0
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(MemorySaveFailedEvent)
def on_memory_save_failed(source, event: MemorySaveFailedEvent):
self.error_count += 1
agent_info = f"Agent '{event.agent_role}'" if event.agent_role else "Unknown agent"
error_message = f"Memory save failed: {event.error}. {agent_info}"
logger.error(error_message)
if self.notify_email and self.error_count % 5 == 0:
self._send_notification(error_message)
@crewai_event_bus.on(MemoryQueryFailedEvent)
def on_memory_query_failed(source, event: MemoryQueryFailedEvent):
self.error_count += 1
error_message = f"Memory query failed: {event.error}. Query: '{event.query}'"
logger.error(error_message)
if self.notify_email and self.error_count % 5 == 0:
self._send_notification(error_message)
def _send_notification(self, message):
# Implement your notification system (email, Slack, etc.)
print(f"[NOTIFICATION] Would send to {self.notify_email}: {message}")
# Create an instance of your listener
error_tracker = MemoryErrorTracker(notify_email="admin@example.com")
```
### Integrating with Analytics Platforms
Memory events can be forwarded to analytics and monitoring platforms to track performance metrics, detect anomalies, and visualize memory usage patterns:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent
)
class MemoryAnalyticsForwarder(BaseEventListener):
def __init__(self, analytics_client):
super().__init__()
self.client = analytics_client
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def on_memory_query_completed(source, event: MemoryQueryCompletedEvent):
# Forward query metrics to analytics platform
self.client.track_metric({
"event_type": "memory_query",
"query": event.query,
"duration_ms": event.query_time_ms,
"result_count": len(event.results) if hasattr(event.results, "__len__") else 0,
"timestamp": event.timestamp
})
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(source, event: MemorySaveCompletedEvent):
# Forward save metrics to analytics platform
self.client.track_metric({
"event_type": "memory_save",
"agent_role": event.agent_role,
"duration_ms": event.save_time_ms,
"timestamp": event.timestamp
})
```
### Best Practices for Memory Event Listeners
1. **Keep handlers lightweight**: Avoid complex processing in event handlers to prevent performance impacts
2. **Use appropriate logging levels**: Use INFO for normal operations, DEBUG for details, ERROR for issues
3. **Batch metrics when possible**: Accumulate metrics before sending to external systems
4. **Handle exceptions gracefully**: Ensure your event handlers don't crash due to unexpected data
5. **Consider memory consumption**: Be mindful of storing large amounts of event data
## Conclusion
Integrating CrewAI's memory system into your projects is straightforward. By leveraging the provided memory components and configurations,
Integrating CrewAI's memory system into your projects is straightforward. By leveraging the provided memory components and configurations,
you can quickly empower your agents with the ability to remember, reason, and learn from their interactions, unlocking new levels of intelligence and capability.

View File

@@ -65,6 +65,8 @@ from .reasoning_events import (
AgentReasoningFailedEvent,
)
from .listeners.memory_listener import MemoryListener
class EventListener(BaseEventListener):
_instance = None
@@ -91,6 +93,8 @@ class EventListener(BaseEventListener):
self._initialized = True
self.formatter = ConsoleFormatter(verbose=True)
MemoryListener(formatter=self.formatter)
# ----------- CREW EVENTS -----------
def setup_listeners(self, crewai_event_bus):

View File

@@ -0,0 +1,110 @@
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.memory_events import (
MemoryRetrievalCompletedEvent,
MemoryRetrievalStartedEvent,
MemoryQueryFailedEvent,
MemoryQueryCompletedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
class MemoryListener(BaseEventListener):
def __init__(self, formatter):
super().__init__()
self.formatter = formatter
self.memory_retrieval_in_progress = False
self.memory_save_in_progress = False
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(MemoryRetrievalStartedEvent)
def on_memory_retrieval_started(
source, event: MemoryRetrievalStartedEvent
):
if self.memory_retrieval_in_progress:
return
self.memory_retrieval_in_progress = True
self.formatter.handle_memory_retrieval_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
def on_memory_retrieval_completed(
source, event: MemoryRetrievalCompletedEvent
):
if not self.memory_retrieval_in_progress:
return
self.memory_retrieval_in_progress = False
self.formatter.handle_memory_retrieval_completed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.memory_content,
event.retrieval_time_ms
)
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def on_memory_query_completed(source, event: MemoryQueryCompletedEvent):
if not self.memory_retrieval_in_progress:
return
self.formatter.handle_memory_query_completed(
self.formatter.current_agent_branch,
event.source_type,
event.query_time_ms,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(MemoryQueryFailedEvent)
def on_memory_query_failed(source, event: MemoryQueryFailedEvent):
if not self.memory_retrieval_in_progress:
return
self.formatter.handle_memory_query_failed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.error,
event.source_type,
)
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_memory_save_started(source, event: MemorySaveStartedEvent):
if self.memory_save_in_progress:
return
self.memory_save_in_progress = True
self.formatter.handle_memory_save_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(source, event: MemorySaveCompletedEvent):
if not self.memory_save_in_progress:
return
self.memory_save_in_progress = False
self.formatter.handle_memory_save_completed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.save_time_ms,
event.source_type,
)
@crewai_event_bus.on(MemorySaveFailedEvent)
def on_memory_save_failed(source, event: MemorySaveFailedEvent):
if not self.memory_save_in_progress:
return
self.formatter.handle_memory_save_failed(
self.formatter.current_agent_branch,
event.error,
event.source_type,
self.formatter.current_crew_tree,
)

View File

@@ -1454,3 +1454,250 @@ class ConsoleFormatter:
)
self.print(finish_panel)
self.print()
def handle_memory_retrieval_started(
self,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
) -> Optional[Tree]:
if not self.verbose:
return None
branch_to_use = agent_branch or self.current_lite_agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
if crew_tree is not None:
branch_to_use = tree_to_use = crew_tree
else:
return None
memory_branch = branch_to_use.add("")
self.update_tree_label(
memory_branch, "🧠", "Memory Retrieval Started", "blue"
)
self.print(tree_to_use)
self.print()
return memory_branch
def handle_memory_retrieval_completed(
self,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
memory_content: str,
retrieval_time_ms: float,
) -> None:
if not self.verbose:
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None and tree_to_use is not None:
branch_to_use = tree_to_use
def add_panel():
memory_text = str(memory_content)
if len(memory_text) > 500:
memory_text = memory_text[:497] + "..."
memory_panel = Panel(
Text(memory_text, style="white"),
title="🧠 Retrieved Memory",
subtitle=f"Retrieval Time: {retrieval_time_ms:.2f}ms",
border_style="green",
padding=(1, 2),
)
self.print(memory_panel)
self.print()
if branch_to_use is None or tree_to_use is None:
add_panel()
return None
memory_branch_found = False
for child in branch_to_use.children:
if "Memory Retrieval Started" in str(child.label):
self.update_tree_label(
child, "", "Memory Retrieval Completed", "green"
)
memory_branch_found = True
break
if not memory_branch_found:
for child in branch_to_use.children:
if (
"Memory Retrieval" in str(child.label)
and "Started" not in str(child.label)
and "Completed" not in str(child.label)
):
self.update_tree_label(
child, "", "Memory Retrieval Completed", "green"
)
memory_branch_found = True
break
if not memory_branch_found:
memory_branch = branch_to_use.add("")
self.update_tree_label(
memory_branch, "", "Memory Retrieval Completed", "green"
)
self.print(tree_to_use)
if memory_content:
add_panel()
def handle_memory_query_completed(
self,
agent_branch: Optional[Tree],
source_type: str,
query_time_ms: float,
crew_tree: Optional[Tree],
) -> None:
if not self.verbose:
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None and tree_to_use is not None:
branch_to_use = tree_to_use
if branch_to_use is None:
return None
memory_type = source_type.replace("_", " ").title()
for child in branch_to_use.children:
if "Memory Retrieval" in str(child.label):
for child in child.children:
sources_branch = child
if "Sources Used" in str(child.label):
sources_branch.add(f"{memory_type} ({query_time_ms:.2f}ms)")
break
else:
sources_branch = child.add("Sources Used")
sources_branch.add(f"{memory_type} ({query_time_ms:.2f}ms)")
break
def handle_memory_query_failed(
self,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
error: str,
source_type: str,
) -> None:
if not self.verbose:
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None and tree_to_use is not None:
branch_to_use = tree_to_use
if branch_to_use is None:
return None
memory_type = source_type.replace("_", " ").title()
for child in branch_to_use.children:
if "Memory Retrieval" in str(child.label):
for child in child.children:
sources_branch = child
if "Sources Used" in str(child.label):
sources_branch.add(f"{memory_type} - Error: {error}")
break
else:
sources_branch = child.add("🧠 Sources Used")
sources_branch.add(f"{memory_type} - Error: {error}")
break
def handle_memory_save_started(
self,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree]
) -> None:
if not self.verbose:
return None
branch_to_use = agent_branch or self.current_lite_agent_branch
tree_to_use = branch_to_use or crew_tree
if tree_to_use is None:
return None
for child in tree_to_use.children:
if "Memory Update" in str(child.label):
break
else:
memory_branch = tree_to_use.add("")
self.update_tree_label(
memory_branch, "🧠", "Memory Update Overall", "white"
)
self.print(tree_to_use)
self.print()
def handle_memory_save_completed(
self,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
save_time_ms: float,
source_type: str,
) -> None:
if not self.verbose:
return None
branch_to_use = agent_branch or self.current_lite_agent_branch
tree_to_use = branch_to_use or crew_tree
if tree_to_use is None:
return None
memory_type = source_type.replace("_", " ").title()
content = f"{memory_type} Memory Saved ({save_time_ms:.2f}ms)"
for child in tree_to_use.children:
if "Memory Update" in str(child.label):
child.add(content)
break
else:
memory_branch = tree_to_use.add("")
memory_branch.add(content)
self.print(tree_to_use)
self.print()
def handle_memory_save_failed(
self,
agent_branch: Optional[Tree],
error: str,
source_type: str,
crew_tree: Optional[Tree],
) -> None:
if not self.verbose:
return None
branch_to_use = agent_branch or self.current_lite_agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return None
memory_type = source_type.replace("_", " ").title()
content = f"{memory_type} Memory Save Failed"
for child in branch_to_use.children:
if "Memory Update" in str(child.label):
child.add(content)
break
else:
memory_branch = branch_to_use.add("")
memory_branch.add(content)
self.print(tree_to_use)
self.print()