Compare commits

..

2 Commits

5 changed files with 198 additions and 351 deletions

View File

@@ -1,349 +0,0 @@
---
title: 'Event Listeners'
description: 'Tap into CrewAI events to build custom integrations and monitoring'
---
# Event Listeners
CrewAI provides a powerful event system that allows you to listen for and react to various events that occur during the execution of your Crew. This feature enables you to build custom integrations, monitoring solutions, logging systems, or any other functionality that needs to be triggered based on CrewAI's internal events.
## How It Works
CrewAI uses an event bus architecture to emit events throughout the execution lifecycle. The event system is built on the following components:
1. **CrewAIEventsBus**: A singleton event bus that manages event registration and emission
2. **CrewEvent**: Base class for all events in the system
3. **BaseEventListener**: Abstract base class for creating custom event listeners
When specific actions occur in CrewAI (like a Crew starting execution, an Agent completing a task, or a tool being used), the system emits corresponding events. You can register handlers for these events to execute custom code when they occur.
## Creating a Custom Event Listener
To create a custom event listener, you need to:
1. Create a class that inherits from `BaseEventListener`
2. Implement the `setup_listeners` method
3. Register handlers for the events you're interested in
4. Create an instance of your listener in the appropriate file
Here's a simple example of a custom event listener class:
```python
from crewai.utilities.events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
AgentExecutionCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
class MyCustomListener(BaseEventListener):
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event):
print(f"Crew '{event.crew_name}' has started execution!")
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
print(f"Crew '{event.crew_name}' has completed execution!")
print(f"Output: {event.output}")
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event):
print(f"Agent '{event.agent.role}' completed task")
print(f"Output: {event.output}")
```
## Properly Registering Your Listener
Simply defining your listener class isn't enough. You need to create an instance of it and ensure it's imported in your application. This ensures that:
1. The event handlers are registered with the event bus
2. The listener instance remains in memory (not garbage collected)
3. The listener is active when events are emitted
### Option 1: Import and Instantiate in Your Crew or Flow Implementation
The most important thing is to create an instance of your listener in the file where your Crew or Flow is defined and executed:
#### For Crew-based Applications
Create and import your listener at the top of your Crew implementation file:
```python
# In your crew.py file
from crewai import Agent, Crew, Task
from my_listeners import MyCustomListener
# Create an instance of your listener
my_listener = MyCustomListener()
class MyCustomCrew:
# Your crew implementation...
def crew(self):
return Crew(
agents=[...],
tasks=[...],
# ...
)
```
#### For Flow-based Applications
Create and import your listener at the top of your Flow implementation file:
```python
# In your main.py or flow.py file
from crewai.flow import Flow, listen, start
from my_listeners import MyCustomListener
# Create an instance of your listener
my_listener = MyCustomListener()
class MyCustomFlow(Flow):
# Your flow implementation...
@start()
def first_step(self):
# ...
```
This ensures that your listener is loaded and active when your Crew or Flow is executed.
### Option 2: Create a Package for Your Listeners
For a more structured approach, especially if you have multiple listeners:
1. Create a package for your listeners:
```
my_project/
├── listeners/
│ ├── __init__.py
│ ├── my_custom_listener.py
│ └── another_listener.py
```
2. In `my_custom_listener.py`, define your listener class and create an instance:
```python
# my_custom_listener.py
from crewai.utilities.events.base_event_listener import BaseEventListener
# ... import events ...
class MyCustomListener(BaseEventListener):
# ... implementation ...
# Create an instance of your listener
my_custom_listener = MyCustomListener()
```
3. In `__init__.py`, import the listener instances to ensure they're loaded:
```python
# __init__.py
from .my_custom_listener import my_custom_listener
from .another_listener import another_listener
# Optionally export them if you need to access them elsewhere
__all__ = ['my_custom_listener', 'another_listener']
```
4. Import your listeners package in your Crew or Flow file:
```python
# In your crew.py or flow.py file
import my_project.listeners # This loads all your listeners
class MyCustomCrew:
# Your crew implementation...
```
This is exactly how CrewAI's built-in `agentops_listener` is registered. In the CrewAI codebase, you'll find:
```python
# src/crewai/utilities/events/third_party/__init__.py
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
## Available Event Types
CrewAI provides a wide range of events that you can listen for:
### Crew Events
- **CrewKickoffStartedEvent**: Emitted when a Crew starts execution
- **CrewKickoffCompletedEvent**: Emitted when a Crew completes execution
- **CrewKickoffFailedEvent**: Emitted when a Crew fails to complete execution
- **CrewTestStartedEvent**: Emitted when a Crew starts testing
- **CrewTestCompletedEvent**: Emitted when a Crew completes testing
- **CrewTestFailedEvent**: Emitted when a Crew fails to complete testing
- **CrewTrainStartedEvent**: Emitted when a Crew starts training
- **CrewTrainCompletedEvent**: Emitted when a Crew completes training
- **CrewTrainFailedEvent**: Emitted when a Crew fails to complete training
### Agent Events
- **AgentExecutionStartedEvent**: Emitted when an Agent starts executing a task
- **AgentExecutionCompletedEvent**: Emitted when an Agent completes executing a task
- **AgentExecutionErrorEvent**: Emitted when an Agent encounters an error during execution
### Task Events
- **TaskStartedEvent**: Emitted when a Task starts execution
- **TaskCompletedEvent**: Emitted when a Task completes execution
- **TaskFailedEvent**: Emitted when a Task fails to complete execution
- **TaskEvaluationEvent**: Emitted when a Task is evaluated
### Tool Usage Events
- **ToolUsageStartedEvent**: Emitted when a tool execution is started
- **ToolUsageFinishedEvent**: Emitted when a tool execution is completed
- **ToolUsageErrorEvent**: Emitted when a tool execution encounters an error
- **ToolValidateInputErrorEvent**: Emitted when a tool input validation encounters an error
- **ToolExecutionErrorEvent**: Emitted when a tool execution encounters an error
- **ToolSelectionErrorEvent**: Emitted when there's an error selecting a tool
### Flow Events
- **FlowCreatedEvent**: Emitted when a Flow is created
- **FlowStartedEvent**: Emitted when a Flow starts execution
- **FlowFinishedEvent**: Emitted when a Flow completes execution
- **FlowPlotEvent**: Emitted when a Flow is plotted
- **MethodExecutionStartedEvent**: Emitted when a Flow method starts execution
- **MethodExecutionFinishedEvent**: Emitted when a Flow method completes execution
- **MethodExecutionFailedEvent**: Emitted when a Flow method fails to complete execution
### LLM Events
- **LLMCallStartedEvent**: Emitted when an LLM call starts
- **LLMCallCompletedEvent**: Emitted when an LLM call completes
- **LLMCallFailedEvent**: Emitted when an LLM call fails
## Event Handler Structure
Each event handler receives two parameters:
1. **source**: The object that emitted the event
2. **event**: The event instance, containing event-specific data
The structure of the event object depends on the event type, but all events inherit from `CrewEvent` and include:
- **timestamp**: The time when the event was emitted
- **type**: A string identifier for the event type
Additional fields vary by event type. For example, `CrewKickoffCompletedEvent` includes `crew_name` and `output` fields.
## Real-World Example: Integration with AgentOps
CrewAI includes an example of a third-party integration with [AgentOps](https://github.com/AgentOps-AI/agentops), a monitoring and observability platform for AI agents. Here's how it's implemented:
```python
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
```
This listener initializes an AgentOps session when a Crew starts, registers agents with AgentOps, tracks tool usage, and ends the session when the Crew completes.
The AgentOps listener is registered in CrewAI's event system through the import in `src/crewai/utilities/events/third_party/__init__.py`:
```python
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
## Advanced Usage: Scoped Handlers
For temporary event handling (useful for testing or specific operations), you can use the `scoped_handlers` context manager:
```python
from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)
def temp_handler(source, event):
print("This handler only exists within this context")
# Do something that emits events
# Outside the context, the temporary handler is removed
```
## Use Cases
Event listeners can be used for a variety of purposes:
1. **Logging and Monitoring**: Track the execution of your Crew and log important events
2. **Analytics**: Collect data about your Crew's performance and behavior
3. **Debugging**: Set up temporary listeners to debug specific issues
4. **Integration**: Connect CrewAI with external systems like monitoring platforms, databases, or notification services
5. **Custom Behavior**: Trigger custom actions based on specific events
## Best Practices
1. **Keep Handlers Light**: Event handlers should be lightweight and avoid blocking operations
2. **Error Handling**: Include proper error handling in your event handlers to prevent exceptions from affecting the main execution
3. **Cleanup**: If your listener allocates resources, ensure they're properly cleaned up
4. **Selective Listening**: Only listen for events you actually need to handle
5. **Testing**: Test your event listeners in isolation to ensure they behave as expected
By leveraging CrewAI's event system, you can extend its functionality and integrate it seamlessly with your existing infrastructure.

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import PrivateAttr
@@ -52,10 +53,34 @@ class ShortTermMemory(Memory):
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""
Save a memory item to the storage.
Args:
value: The data to save.
metadata: Optional metadata to associate with the memory.
agent: Optional agent identifier.
Raises:
ValueError: If the item's timestamp is in the future.
"""
import logging
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if item.timestamp > datetime.now():
raise ValueError("Cannot save memory item with future timestamp")
logging.debug(f"Saving memory item with timestamp: {item.timestamp}")
if self._memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
# Include timestamp in metadata
if item.metadata is None:
item.metadata = {}
item.metadata["timestamp"] = item.timestamp.isoformat()
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
def search(

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Any, Dict, Optional
@@ -7,7 +8,11 @@ class ShortTermMemoryItem:
data: Any,
agent: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
timestamp: Optional[datetime] = None,
):
if timestamp is not None and timestamp > datetime.now():
raise ValueError("Timestamp cannot be in the future")
self.data = data
self.agent = agent
self.metadata = metadata if metadata is not None else {}
self.timestamp = timestamp if timestamp is not None else datetime.now()

View File

@@ -114,13 +114,32 @@ class RAGStorage(BaseRAGStorage):
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
recency_weight: float = 0.3,
time_decay_days: float = 1.0,
) -> List[Any]:
"""
Search for entries in the storage based on semantic similarity and recency.
Args:
query: The search query string.
limit: Maximum number of results to return.
filter: Optional filter to apply to the search.
score_threshold: Minimum score threshold for results.
recency_weight: Weight given to recency vs. semantic similarity (0.0-1.0).
Higher values prioritize recent memories more strongly.
time_decay_days: Number of days over which recency factor decays to zero.
Smaller values make older memories lose relevance faster.
Returns:
List of search results, each containing id, metadata, context, and score.
Results are sorted by combined semantic similarity and recency score.
"""
if not hasattr(self, "app"):
self._initialize_app()
try:
with suppress_logging():
response = self.collection.query(query_texts=query, n_results=limit)
response = self.collection.query(query_texts=query, n_results=limit * 2) # Get more results to allow for recency filtering
results = []
for i in range(len(response["ids"][0])):
@@ -130,10 +149,27 @@ class RAGStorage(BaseRAGStorage):
"context": response["documents"][0][i],
"score": response["distances"][0][i],
}
# Apply recency boost if timestamp exists in metadata
if "timestamp" in result["metadata"]:
try:
from datetime import datetime
timestamp = datetime.fromisoformat(result["metadata"]["timestamp"])
now = datetime.now()
# Calculate recency factor (newer = higher score)
time_diff_seconds = (now - timestamp).total_seconds()
recency_factor = max(0, 1 - (time_diff_seconds / (time_decay_days * 24 * 60 * 60)))
# Adjust score with recency factor
result["score"] = result["score"] * (1 - recency_weight) + recency_factor * recency_weight
except (ValueError, TypeError):
pass # If timestamp parsing fails, use original score
if result["score"] >= score_threshold:
results.append(result)
return results
# Sort by adjusted score (higher is better)
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit] # Return only the requested number of results
except Exception as e:
logging.error(f"Error during {self.type} search: {str(e)}")
return []

View File

@@ -0,0 +1,130 @@
import time
from datetime import datetime, timedelta
from unittest.mock import patch
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.task import Task
@pytest.fixture
def short_term_memory():
"""Fixture to create a ShortTermMemory instance"""
agent = Agent(
role="Tutor",
goal="Teach programming concepts",
backstory="You are a programming tutor helping students learn.",
tools=[],
verbose=True,
)
task = Task(
description="Explain programming concepts to students.",
expected_output="Clear explanations of programming concepts.",
agent=agent,
)
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
def test_memory_prioritizes_recent_topic(short_term_memory):
"""Test that memory retrieval prioritizes the most recent topic in a conversation."""
# First topic: Python variables
topic1_data = "Variables in Python are dynamically typed. You can assign any value to a variable without declaring its type."
topic1_timestamp = datetime.now() - timedelta(minutes=10) # Older memory
# Second topic: Python abstract classes
topic2_data = "Abstract classes in Python are created using the ABC module. They cannot be instantiated and are used as a blueprint for other classes."
topic2_timestamp = datetime.now() # More recent memory
# Mock search results to simulate what would be returned by RAGStorage
mock_results = [
{
"id": "2",
"metadata": {
"agent": "Tutor",
"topic": "python_abstract_classes",
"timestamp": topic2_timestamp.isoformat()
},
"context": topic2_data,
"score": 0.85, # Higher score due to recency boost
},
{
"id": "1",
"metadata": {
"agent": "Tutor",
"topic": "python_variables",
"timestamp": topic1_timestamp.isoformat()
},
"context": topic1_data,
"score": 0.75, # Lower score due to being older
}
]
# Mock the search method to return our predefined results
with patch.object(RAGStorage, 'search', return_value=mock_results):
# Query that could match both topics but should prioritize the more recent one
query = "Can you give me another example of that?"
# Search with recency consideration
results = short_term_memory.search(query)
# Verify that the most recent topic (abstract classes) is prioritized
assert len(results) > 0, "No search results returned"
# The first result should be about abstract classes (the more recent topic)
assert "abstract classes" in results[0]["context"].lower(), "Recent topic (abstract classes) not prioritized"
# If there are multiple results, check if the older topic is also returned but with lower priority
if len(results) > 1:
assert "variables" in results[1]["context"].lower(), "Older topic should be second"
# Verify that the scores reflect the recency prioritization
assert results[0]["score"] > results[1]["score"], "Recent topic should have higher score"
def test_future_timestamp_validation():
"""Test that ShortTermMemoryItem raises ValueError for future timestamps."""
# Setup agent and task for memory
agent = Agent(
role="Tutor",
goal="Teach programming concepts",
backstory="You are a programming tutor helping students learn.",
tools=[],
verbose=True,
)
task = Task(
description="Explain programming concepts to students.",
expected_output="Clear explanations of programming concepts.",
agent=agent,
)
# Create a future timestamp
future_timestamp = datetime.now() + timedelta(days=1)
# Test constructor validation
with pytest.raises(ValueError, match="Timestamp cannot be in the future"):
ShortTermMemoryItem(data="Test data", timestamp=future_timestamp)
# Test save method validation
memory = ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
# Create a memory item with a future timestamp
future_data = "Test data with future timestamp"
# We need to pass the data directly to the save method
# The save method will create a ShortTermMemoryItem internally
# and then we'll modify its timestamp before it's saved
# Mock datetime.now to return a fixed time
with patch('crewai.memory.short_term.short_term_memory_item.datetime') as mock_datetime:
# Set up the mock to return our future timestamp when now() is called
mock_datetime.now.return_value = future_timestamp
with pytest.raises(ValueError, match="Cannot save memory item with future timestamp"):
memory.save(value=future_data)