Compare commits

...

2 Commits

Author SHA1 Message Date
Devin AI
f568565786 Fix import sorting in example and test files
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-05 03:26:44 +00:00
Devin AI
cb1e3a13ef Add ChatMessageHistory feature for multi-round dialogues in REST sessions (Issue #2284)
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-05 03:24:57 +00:00
7 changed files with 523 additions and 1 deletions

View File

@@ -567,6 +567,81 @@ my_crew.reset_memories(command_type = 'all') # Resets all the memory
- 🫡 **Enhanced Personalization:** Memory enables agents to remember user preferences and historical interactions, leading to personalized experiences.
- 🧠 **Improved Problem Solving:** Access to a rich memory store aids agents in making more informed decisions, drawing on past learnings and contextual insights.
## Chat Message History
The `ChatMessageHistory` class provides a way to store and retrieve chat messages with roles (human, AI, system), similar to Langchain's ChatMessageHistory. This feature is particularly useful for maintaining conversation context across multiple interactions within a single REST session.
### Basic Usage
```python
from crewai.memory import ChatMessageHistory, MessageRole
# Create a chat message history
chat_history = ChatMessageHistory()
# Add messages
chat_history.add_human_message("Hello, how are you?")
chat_history.add_ai_message("I'm doing well, thank you!")
chat_history.add_system_message("System message")
# Get messages
messages = chat_history.get_messages()
# Get messages as dictionaries (useful for serialization)
messages_dict = chat_history.get_messages_as_dict()
# Search for messages
results = chat_history.search("specific topic")
# Clear messages
chat_history.clear()
```
### Using with REST APIs
For REST API applications, you can store the chat history between requests:
```python
# In your API endpoint
def chat_endpoint(request):
# Get or create a session
session_id = request.session_id
# Get or create chat history for this session
if session_id in session_storage:
chat_history_dict = session_storage[session_id]
chat_history = ChatMessageHistory()
# Restore previous messages
for msg in chat_history_dict:
if msg["role"] == "human":
chat_history.add_human_message(msg["content"], msg.get("metadata", {}))
elif msg["role"] == "ai":
chat_history.add_ai_message(msg["content"], msg.get("metadata", {}))
elif msg["role"] == "system":
chat_history.add_system_message(msg["content"], msg.get("metadata", {}))
else:
chat_history = ChatMessageHistory()
# Add the new message from the request
chat_history.add_human_message(request.message)
# Process with CrewAI
crew = Crew(agents=[...], tasks=[...], memory=True)
result = crew.kickoff(inputs={"chat_history": chat_history.get_messages_as_dict()})
# Add the response to the chat history
chat_history.add_ai_message(str(result))
# Store the updated chat history
session_storage[session_id] = chat_history.get_messages_as_dict()
# Return the response
return {"response": str(result)}
```
This allows for maintaining conversation context across multiple API calls within a single session.
## Conclusion
Integrating CrewAI's memory system into your projects is straightforward. By leveraging the provided memory components and configurations,

View File

@@ -0,0 +1,48 @@
from crewai import Agent, Crew, Task
from crewai.memory import ChatMessageHistory, MessageRole
# Create a chat message history
chat_history = ChatMessageHistory()
# Add some messages
chat_history.add_human_message("Hello, I need help with a research task.")
chat_history.add_ai_message("I'd be happy to help! What topic are you interested in?")
chat_history.add_human_message("I'm interested in renewable energy technologies.")
# Create an agent with access to the chat history
researcher = Agent(
role="Renewable Energy Researcher",
goal="Provide accurate and up-to-date information on renewable energy technologies",
backstory="You are an expert in renewable energy with years of research experience.",
verbose=True,
)
# Create a task that uses the chat history
research_task = Task(
description=(
"Review the conversation history and provide a detailed response about "
"renewable energy technologies, addressing any specific questions or interests."
),
expected_output="A comprehensive response about renewable energy technologies.",
agent=researcher,
)
# Create a crew with memory enabled
crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=True,
memory=True,
)
# Pass the chat history to the crew
# In a real REST API scenario, you would store and retrieve this between requests
crew_result = crew.kickoff(inputs={"chat_history": chat_history.get_messages_as_dict()})
# Add the crew's response to the chat history
chat_history.add_ai_message(str(crew_result))
# Print the full conversation history
print("\nFull Conversation History:")
for message in chat_history.get_messages():
print(f"{message.role.value.capitalize()}: {message.content}")

View File

@@ -2,5 +2,15 @@ from .entity.entity_memory import EntityMemory
from .long_term.long_term_memory import LongTermMemory
from .short_term.short_term_memory import ShortTermMemory
from .user.user_memory import UserMemory
from .chat_history.chat_message_history import ChatMessageHistory
from .chat_history.chat_message import ChatMessage, MessageRole
__all__ = ["UserMemory", "EntityMemory", "LongTermMemory", "ShortTermMemory"]
__all__ = [
"UserMemory",
"EntityMemory",
"LongTermMemory",
"ShortTermMemory",
"ChatMessageHistory",
"ChatMessage",
"MessageRole",
]

View File

@@ -0,0 +1,4 @@
from crewai.memory.chat_history.chat_message import ChatMessage, MessageRole
from crewai.memory.chat_history.chat_message_history import ChatMessageHistory
__all__ = ["ChatMessage", "MessageRole", "ChatMessageHistory"]

View File

@@ -0,0 +1,53 @@
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional
class MessageRole(str, Enum):
"""Enum for message roles in a chat."""
HUMAN = "human"
AI = "ai"
SYSTEM = "system"
class ChatMessage:
"""
Represents a single message in a chat history.
Attributes:
role: The role of the message sender (human, ai, or system).
content: The content of the message.
timestamp: When the message was created.
metadata: Additional information about the message.
"""
def __init__(
self,
role: MessageRole,
content: str,
timestamp: Optional[datetime] = None,
metadata: Optional[Dict[str, Any]] = None,
):
self.role = role
self.content = content
self.timestamp = timestamp or datetime.now()
self.metadata = metadata or {}
def to_dict(self) -> Dict[str, Any]:
"""Convert the message to a dictionary."""
return {
"role": self.role.value,
"content": self.content,
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ChatMessage":
"""Create a message from a dictionary."""
return cls(
role=MessageRole(data["role"]),
content=data["content"],
timestamp=datetime.fromisoformat(data["timestamp"]),
metadata=data.get("metadata", {}),
)

View File

@@ -0,0 +1,180 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from pydantic import PrivateAttr
from crewai.memory.chat_history.chat_message import ChatMessage, MessageRole
from crewai.memory.memory import Memory
from crewai.memory.storage.rag_storage import RAGStorage
class ChatMessageHistory(Memory):
"""
ChatMessageHistory class for storing and retrieving chat messages.
This class allows for maintaining conversation context across multiple
interactions within a single session, similar to Langchain's ChatMessageHistory.
Attributes:
messages: A list of ChatMessage objects representing the conversation history.
"""
_memory_provider: Optional[str] = PrivateAttr()
_messages: List[ChatMessage] = PrivateAttr(default_factory=list)
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_provider = crew.memory_config.get("provider")
else:
memory_provider = None
if memory_provider == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
storage = Mem0Storage(type="chat_history", crew=crew)
else:
storage = (
storage
if storage
else RAGStorage(
type="chat_history",
embedder_config=embedder_config,
crew=crew,
path=path,
)
)
super().__init__(storage=storage)
self._memory_provider = memory_provider
self._messages = []
def add_message(
self,
role: MessageRole,
content: str,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""
Add a message to the chat history.
Args:
role: The role of the message sender (human, ai, or system).
content: The content of the message.
metadata: Additional information about the message.
agent: The agent associated with the message.
"""
message = ChatMessage(role=role, content=content, metadata=metadata)
self._messages.append(message)
# Save to storage for persistence and retrieval
metadata = metadata or {}
if agent:
metadata["agent"] = agent
# Add role and timestamp to metadata
metadata["role"] = role.value
metadata["timestamp"] = message.timestamp.isoformat()
super().save(value=content, metadata=metadata, agent=agent)
def add_human_message(
self,
content: str,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Add a human message to the chat history."""
self.add_message(MessageRole.HUMAN, content, metadata, agent)
def add_ai_message(
self,
content: str,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Add an AI message to the chat history."""
self.add_message(MessageRole.AI, content, metadata, agent)
def add_system_message(
self,
content: str,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Add a system message to the chat history."""
self.add_message(MessageRole.SYSTEM, content, metadata, agent)
def get_messages(self) -> List[ChatMessage]:
"""Get all messages in the chat history."""
return self._messages
def get_messages_as_dict(self) -> List[Dict[str, Any]]:
"""Get all messages in the chat history as dictionaries."""
return [message.to_dict() for message in self._messages]
def clear(self) -> None:
"""Clear all messages from the chat history."""
self._messages = []
self.reset()
def reset(self) -> None:
"""Reset the storage."""
try:
self.storage.reset()
except Exception as e:
raise Exception(
f"An error occurred while resetting the chat message history: {e}"
)
def search(
self,
query: str,
limit: int = 5,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
"""
Search for messages in the chat history.
Args:
query: The search query.
limit: The maximum number of results to return.
score_threshold: The minimum similarity score for results.
Returns:
A list of dictionaries containing the search results.
"""
results = self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
)
# Convert the search results to ChatMessage objects
messages = []
for result in results:
try:
role = result["metadata"].get("role", "ai")
content = result["context"]
timestamp = result["metadata"].get("timestamp")
if timestamp:
timestamp = datetime.fromisoformat(timestamp)
else:
timestamp = datetime.now()
metadata = {k: v for k, v in result["metadata"].items()
if k not in ["role", "timestamp"]}
message = ChatMessage(
role=MessageRole(role),
content=content,
timestamp=timestamp,
metadata=metadata,
)
messages.append(message.to_dict())
except Exception:
# Skip invalid messages
continue
return messages

View File

@@ -0,0 +1,152 @@
from datetime import datetime
import pytest
from crewai.memory.chat_history.chat_message import ChatMessage, MessageRole
from crewai.memory.chat_history.chat_message_history import ChatMessageHistory
@pytest.fixture
def chat_message_history():
"""Fixture to create a ChatMessageHistory instance"""
return ChatMessageHistory()
def test_add_and_get_messages(chat_message_history):
"""Test adding messages and retrieving them."""
# Add messages
chat_message_history.add_human_message("Hello, how are you?")
chat_message_history.add_ai_message("I'm doing well, thank you!")
chat_message_history.add_system_message("System message")
# Get messages
messages = chat_message_history.get_messages()
# Verify messages
assert len(messages) == 3
assert messages[0].role == MessageRole.HUMAN
assert messages[0].content == "Hello, how are you?"
assert messages[1].role == MessageRole.AI
assert messages[1].content == "I'm doing well, thank you!"
assert messages[2].role == MessageRole.SYSTEM
assert messages[2].content == "System message"
def test_get_messages_as_dict(chat_message_history):
"""Test getting messages as dictionaries."""
# Add messages
chat_message_history.add_human_message("Hello")
chat_message_history.add_ai_message("Hi there")
# Get messages as dict
messages_dict = chat_message_history.get_messages_as_dict()
# Verify messages
assert len(messages_dict) == 2
assert messages_dict[0]["role"] == "human"
assert messages_dict[0]["content"] == "Hello"
assert messages_dict[1]["role"] == "ai"
assert messages_dict[1]["content"] == "Hi there"
assert "timestamp" in messages_dict[0]
assert "metadata" in messages_dict[0]
def test_clear_messages(chat_message_history):
"""Test clearing messages."""
# Add messages
chat_message_history.add_human_message("Hello")
chat_message_history.add_ai_message("Hi there")
# Verify messages were added
assert len(chat_message_history.get_messages()) == 2
# Clear messages
chat_message_history.clear()
# Verify messages were cleared
assert len(chat_message_history.get_messages()) == 0
def test_search_messages(chat_message_history, monkeypatch):
"""Test searching for messages."""
# Add messages with specific content
chat_message_history.add_human_message(
"I need information about machine learning algorithms"
)
chat_message_history.add_ai_message(
"Machine learning algorithms include decision trees, neural networks, and SVMs"
)
chat_message_history.add_human_message(
"Tell me more about neural networks"
)
# Mock storage search results
mock_search_results = [
{
"context": "Machine learning algorithms include decision trees, neural networks, and SVMs",
"metadata": {
"role": "ai",
"timestamp": "2023-01-01T00:00:00"
}
}
]
# Monkeypatch the storage.search method
def mock_storage_search(*args, **kwargs):
return mock_search_results
monkeypatch.setattr(chat_message_history.storage, "search", mock_storage_search)
# Search for messages about neural networks
results = chat_message_history.search("neural networks")
# Verify search results
assert len(results) > 0
assert any("neural networks" in result["content"] for result in results)
def test_message_with_metadata(chat_message_history):
"""Test adding and retrieving messages with metadata."""
# Add message with metadata
metadata = {"user_id": "123", "session_id": "abc"}
chat_message_history.add_human_message(
"Hello with metadata", metadata=metadata
)
# Get messages
messages = chat_message_history.get_messages()
# Verify metadata
assert len(messages) == 1
assert messages[0].metadata["user_id"] == "123"
assert messages[0].metadata["session_id"] == "abc"
def test_chat_message_to_from_dict():
"""Test converting ChatMessage to and from dictionary."""
# Create a message
timestamp = datetime.now()
message = ChatMessage(
role=MessageRole.HUMAN,
content="Test message",
timestamp=timestamp,
metadata={"key": "value"}
)
# Convert to dict
message_dict = message.to_dict()
# Verify dict
assert message_dict["role"] == "human"
assert message_dict["content"] == "Test message"
assert message_dict["timestamp"] == timestamp.isoformat()
assert message_dict["metadata"] == {"key": "value"}
# Convert back to ChatMessage
new_message = ChatMessage.from_dict(message_dict)
# Verify new message
assert new_message.role == MessageRole.HUMAN
assert new_message.content == "Test message"
assert new_message.timestamp.isoformat() == timestamp.isoformat()
assert new_message.metadata == {"key": "value"}