Add ChatMessageHistory feature for multi-round dialogues in REST sessions (Issue #2284)

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-03-05 03:24:57 +00:00
parent 00eede0d5d
commit cb1e3a13ef
7 changed files with 522 additions and 1 deletions

View File

@@ -567,6 +567,81 @@ my_crew.reset_memories(command_type = 'all') # Resets all the memory
- 🫡 **Enhanced Personalization:** Memory enables agents to remember user preferences and historical interactions, leading to personalized experiences. - 🫡 **Enhanced Personalization:** Memory enables agents to remember user preferences and historical interactions, leading to personalized experiences.
- 🧠 **Improved Problem Solving:** Access to a rich memory store aids agents in making more informed decisions, drawing on past learnings and contextual insights. - 🧠 **Improved Problem Solving:** Access to a rich memory store aids agents in making more informed decisions, drawing on past learnings and contextual insights.
## Chat Message History
The `ChatMessageHistory` class provides a way to store and retrieve chat messages with roles (human, AI, system), similar to Langchain's ChatMessageHistory. This feature is particularly useful for maintaining conversation context across multiple interactions within a single REST session.
### Basic Usage
```python
from crewai.memory import ChatMessageHistory, MessageRole
# Create a chat message history
chat_history = ChatMessageHistory()
# Add messages
chat_history.add_human_message("Hello, how are you?")
chat_history.add_ai_message("I'm doing well, thank you!")
chat_history.add_system_message("System message")
# Get messages
messages = chat_history.get_messages()
# Get messages as dictionaries (useful for serialization)
messages_dict = chat_history.get_messages_as_dict()
# Search for messages
results = chat_history.search("specific topic")
# Clear messages
chat_history.clear()
```
### Using with REST APIs
For REST API applications, you can store the chat history between requests:
```python
# In your API endpoint
def chat_endpoint(request):
# Get or create a session
session_id = request.session_id
# Get or create chat history for this session
if session_id in session_storage:
chat_history_dict = session_storage[session_id]
chat_history = ChatMessageHistory()
# Restore previous messages
for msg in chat_history_dict:
if msg["role"] == "human":
chat_history.add_human_message(msg["content"], msg.get("metadata", {}))
elif msg["role"] == "ai":
chat_history.add_ai_message(msg["content"], msg.get("metadata", {}))
elif msg["role"] == "system":
chat_history.add_system_message(msg["content"], msg.get("metadata", {}))
else:
chat_history = ChatMessageHistory()
# Add the new message from the request
chat_history.add_human_message(request.message)
# Process with CrewAI
crew = Crew(agents=[...], tasks=[...], memory=True)
result = crew.kickoff(inputs={"chat_history": chat_history.get_messages_as_dict()})
# Add the response to the chat history
chat_history.add_ai_message(str(result))
# Store the updated chat history
session_storage[session_id] = chat_history.get_messages_as_dict()
# Return the response
return {"response": str(result)}
```
This allows for maintaining conversation context across multiple API calls within a single session.
## Conclusion ## Conclusion
Integrating CrewAI's memory system into your projects is straightforward. By leveraging the provided memory components and configurations, Integrating CrewAI's memory system into your projects is straightforward. By leveraging the provided memory components and configurations,

View File

@@ -0,0 +1,48 @@
from crewai import Agent, Task, Crew
from crewai.memory import ChatMessageHistory, MessageRole
# Create a chat message history
chat_history = ChatMessageHistory()
# Add some messages
chat_history.add_human_message("Hello, I need help with a research task.")
chat_history.add_ai_message("I'd be happy to help! What topic are you interested in?")
chat_history.add_human_message("I'm interested in renewable energy technologies.")
# Create an agent with access to the chat history
researcher = Agent(
role="Renewable Energy Researcher",
goal="Provide accurate and up-to-date information on renewable energy technologies",
backstory="You are an expert in renewable energy with years of research experience.",
verbose=True,
)
# Create a task that uses the chat history
research_task = Task(
description=(
"Review the conversation history and provide a detailed response about "
"renewable energy technologies, addressing any specific questions or interests."
),
expected_output="A comprehensive response about renewable energy technologies.",
agent=researcher,
)
# Create a crew with memory enabled
crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=True,
memory=True,
)
# Pass the chat history to the crew
# In a real REST API scenario, you would store and retrieve this between requests
crew_result = crew.kickoff(inputs={"chat_history": chat_history.get_messages_as_dict()})
# Add the crew's response to the chat history
chat_history.add_ai_message(str(crew_result))
# Print the full conversation history
print("\nFull Conversation History:")
for message in chat_history.get_messages():
print(f"{message.role.value.capitalize()}: {message.content}")

View File

@@ -2,5 +2,15 @@ from .entity.entity_memory import EntityMemory
from .long_term.long_term_memory import LongTermMemory from .long_term.long_term_memory import LongTermMemory
from .short_term.short_term_memory import ShortTermMemory from .short_term.short_term_memory import ShortTermMemory
from .user.user_memory import UserMemory from .user.user_memory import UserMemory
from .chat_history.chat_message_history import ChatMessageHistory
from .chat_history.chat_message import ChatMessage, MessageRole
__all__ = ["UserMemory", "EntityMemory", "LongTermMemory", "ShortTermMemory"] __all__ = [
"UserMemory",
"EntityMemory",
"LongTermMemory",
"ShortTermMemory",
"ChatMessageHistory",
"ChatMessage",
"MessageRole",
]

View File

@@ -0,0 +1,4 @@
from crewai.memory.chat_history.chat_message import ChatMessage, MessageRole
from crewai.memory.chat_history.chat_message_history import ChatMessageHistory
__all__ = ["ChatMessage", "MessageRole", "ChatMessageHistory"]

View File

@@ -0,0 +1,53 @@
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional
class MessageRole(str, Enum):
"""Enum for message roles in a chat."""
HUMAN = "human"
AI = "ai"
SYSTEM = "system"
class ChatMessage:
"""
Represents a single message in a chat history.
Attributes:
role: The role of the message sender (human, ai, or system).
content: The content of the message.
timestamp: When the message was created.
metadata: Additional information about the message.
"""
def __init__(
self,
role: MessageRole,
content: str,
timestamp: Optional[datetime] = None,
metadata: Optional[Dict[str, Any]] = None,
):
self.role = role
self.content = content
self.timestamp = timestamp or datetime.now()
self.metadata = metadata or {}
def to_dict(self) -> Dict[str, Any]:
"""Convert the message to a dictionary."""
return {
"role": self.role.value,
"content": self.content,
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ChatMessage":
"""Create a message from a dictionary."""
return cls(
role=MessageRole(data["role"]),
content=data["content"],
timestamp=datetime.fromisoformat(data["timestamp"]),
metadata=data.get("metadata", {}),
)

View File

@@ -0,0 +1,180 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from pydantic import PrivateAttr
from crewai.memory.chat_history.chat_message import ChatMessage, MessageRole
from crewai.memory.memory import Memory
from crewai.memory.storage.rag_storage import RAGStorage
class ChatMessageHistory(Memory):
"""
ChatMessageHistory class for storing and retrieving chat messages.
This class allows for maintaining conversation context across multiple
interactions within a single session, similar to Langchain's ChatMessageHistory.
Attributes:
messages: A list of ChatMessage objects representing the conversation history.
"""
_memory_provider: Optional[str] = PrivateAttr()
_messages: List[ChatMessage] = PrivateAttr(default_factory=list)
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_provider = crew.memory_config.get("provider")
else:
memory_provider = None
if memory_provider == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
storage = Mem0Storage(type="chat_history", crew=crew)
else:
storage = (
storage
if storage
else RAGStorage(
type="chat_history",
embedder_config=embedder_config,
crew=crew,
path=path,
)
)
super().__init__(storage=storage)
self._memory_provider = memory_provider
self._messages = []
def add_message(
self,
role: MessageRole,
content: str,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""
Add a message to the chat history.
Args:
role: The role of the message sender (human, ai, or system).
content: The content of the message.
metadata: Additional information about the message.
agent: The agent associated with the message.
"""
message = ChatMessage(role=role, content=content, metadata=metadata)
self._messages.append(message)
# Save to storage for persistence and retrieval
metadata = metadata or {}
if agent:
metadata["agent"] = agent
# Add role and timestamp to metadata
metadata["role"] = role.value
metadata["timestamp"] = message.timestamp.isoformat()
super().save(value=content, metadata=metadata, agent=agent)
def add_human_message(
self,
content: str,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Add a human message to the chat history."""
self.add_message(MessageRole.HUMAN, content, metadata, agent)
def add_ai_message(
self,
content: str,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Add an AI message to the chat history."""
self.add_message(MessageRole.AI, content, metadata, agent)
def add_system_message(
self,
content: str,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Add a system message to the chat history."""
self.add_message(MessageRole.SYSTEM, content, metadata, agent)
def get_messages(self) -> List[ChatMessage]:
"""Get all messages in the chat history."""
return self._messages
def get_messages_as_dict(self) -> List[Dict[str, Any]]:
"""Get all messages in the chat history as dictionaries."""
return [message.to_dict() for message in self._messages]
def clear(self) -> None:
"""Clear all messages from the chat history."""
self._messages = []
self.reset()
def reset(self) -> None:
"""Reset the storage."""
try:
self.storage.reset()
except Exception as e:
raise Exception(
f"An error occurred while resetting the chat message history: {e}"
)
def search(
self,
query: str,
limit: int = 5,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
"""
Search for messages in the chat history.
Args:
query: The search query.
limit: The maximum number of results to return.
score_threshold: The minimum similarity score for results.
Returns:
A list of dictionaries containing the search results.
"""
results = self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
)
# Convert the search results to ChatMessage objects
messages = []
for result in results:
try:
role = result["metadata"].get("role", "ai")
content = result["context"]
timestamp = result["metadata"].get("timestamp")
if timestamp:
timestamp = datetime.fromisoformat(timestamp)
else:
timestamp = datetime.now()
metadata = {k: v for k, v in result["metadata"].items()
if k not in ["role", "timestamp"]}
message = ChatMessage(
role=MessageRole(role),
content=content,
timestamp=timestamp,
metadata=metadata,
)
messages.append(message.to_dict())
except Exception:
# Skip invalid messages
continue
return messages

View File

@@ -0,0 +1,151 @@
import pytest
from datetime import datetime
from crewai.memory.chat_history.chat_message_history import ChatMessageHistory
from crewai.memory.chat_history.chat_message import ChatMessage, MessageRole
@pytest.fixture
def chat_message_history():
"""Fixture to create a ChatMessageHistory instance"""
return ChatMessageHistory()
def test_add_and_get_messages(chat_message_history):
"""Test adding messages and retrieving them."""
# Add messages
chat_message_history.add_human_message("Hello, how are you?")
chat_message_history.add_ai_message("I'm doing well, thank you!")
chat_message_history.add_system_message("System message")
# Get messages
messages = chat_message_history.get_messages()
# Verify messages
assert len(messages) == 3
assert messages[0].role == MessageRole.HUMAN
assert messages[0].content == "Hello, how are you?"
assert messages[1].role == MessageRole.AI
assert messages[1].content == "I'm doing well, thank you!"
assert messages[2].role == MessageRole.SYSTEM
assert messages[2].content == "System message"
def test_get_messages_as_dict(chat_message_history):
"""Test getting messages as dictionaries."""
# Add messages
chat_message_history.add_human_message("Hello")
chat_message_history.add_ai_message("Hi there")
# Get messages as dict
messages_dict = chat_message_history.get_messages_as_dict()
# Verify messages
assert len(messages_dict) == 2
assert messages_dict[0]["role"] == "human"
assert messages_dict[0]["content"] == "Hello"
assert messages_dict[1]["role"] == "ai"
assert messages_dict[1]["content"] == "Hi there"
assert "timestamp" in messages_dict[0]
assert "metadata" in messages_dict[0]
def test_clear_messages(chat_message_history):
"""Test clearing messages."""
# Add messages
chat_message_history.add_human_message("Hello")
chat_message_history.add_ai_message("Hi there")
# Verify messages were added
assert len(chat_message_history.get_messages()) == 2
# Clear messages
chat_message_history.clear()
# Verify messages were cleared
assert len(chat_message_history.get_messages()) == 0
def test_search_messages(chat_message_history, monkeypatch):
"""Test searching for messages."""
# Add messages with specific content
chat_message_history.add_human_message(
"I need information about machine learning algorithms"
)
chat_message_history.add_ai_message(
"Machine learning algorithms include decision trees, neural networks, and SVMs"
)
chat_message_history.add_human_message(
"Tell me more about neural networks"
)
# Mock storage search results
mock_search_results = [
{
"context": "Machine learning algorithms include decision trees, neural networks, and SVMs",
"metadata": {
"role": "ai",
"timestamp": "2023-01-01T00:00:00"
}
}
]
# Monkeypatch the storage.search method
def mock_storage_search(*args, **kwargs):
return mock_search_results
monkeypatch.setattr(chat_message_history.storage, "search", mock_storage_search)
# Search for messages about neural networks
results = chat_message_history.search("neural networks")
# Verify search results
assert len(results) > 0
assert any("neural networks" in result["content"] for result in results)
def test_message_with_metadata(chat_message_history):
"""Test adding and retrieving messages with metadata."""
# Add message with metadata
metadata = {"user_id": "123", "session_id": "abc"}
chat_message_history.add_human_message(
"Hello with metadata", metadata=metadata
)
# Get messages
messages = chat_message_history.get_messages()
# Verify metadata
assert len(messages) == 1
assert messages[0].metadata["user_id"] == "123"
assert messages[0].metadata["session_id"] == "abc"
def test_chat_message_to_from_dict():
"""Test converting ChatMessage to and from dictionary."""
# Create a message
timestamp = datetime.now()
message = ChatMessage(
role=MessageRole.HUMAN,
content="Test message",
timestamp=timestamp,
metadata={"key": "value"}
)
# Convert to dict
message_dict = message.to_dict()
# Verify dict
assert message_dict["role"] == "human"
assert message_dict["content"] == "Test message"
assert message_dict["timestamp"] == timestamp.isoformat()
assert message_dict["metadata"] == {"key": "value"}
# Convert back to ChatMessage
new_message = ChatMessage.from_dict(message_dict)
# Verify new message
assert new_message.role == MessageRole.HUMAN
assert new_message.content == "Test message"
assert new_message.timestamp.isoformat() == timestamp.isoformat()
assert new_message.metadata == {"key": "value"}