Add Elasticsearch integration for RAG storage

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-04-23 05:27:53 +00:00
parent 2e4c97661a
commit 6c08e6062a
10 changed files with 1019 additions and 24 deletions

View File

@@ -0,0 +1,91 @@
"""Integration test for Elasticsearch with CrewAI."""
import os
import unittest
import pytest
from crewai import Agent, Crew, Task
@pytest.mark.skipif(
os.environ.get("RUN_ELASTICSEARCH_TESTS") != "true",
reason="Elasticsearch tests require RUN_ELASTICSEARCH_TESTS=true"
)
class TestElasticsearchIntegration(unittest.TestCase):
"""Integration test for Elasticsearch with CrewAI."""
def test_crew_with_elasticsearch_memory(self):
"""Test a crew with Elasticsearch memory."""
researcher = Agent(
role="Researcher",
goal="Research a topic",
backstory="You are a researcher who loves to find information.",
)
writer = Agent(
role="Writer",
goal="Write about a topic",
backstory="You are a writer who loves to write about topics.",
)
research_task = Task(
description="Research about AI",
expected_output="Information about AI",
agent=researcher,
)
write_task = Task(
description="Write about AI",
expected_output="Article about AI",
agent=writer,
context=[research_task],
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
memory_config={"provider": "elasticsearch"},
)
result = crew.kickoff()
self.assertIsNotNone(result)
def test_crew_with_elasticsearch_knowledge(self):
"""Test a crew with Elasticsearch knowledge."""
from crewai.knowledge import Knowledge
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
content = "AI is a field of computer science that focuses on creating machines that can perform tasks that typically require human intelligence."
string_source = StringKnowledgeSource(
content=content, metadata={"topic": "AI"}
)
knowledge = Knowledge(
collection_name="test",
sources=[string_source],
storage_provider="elasticsearch",
)
agent = Agent(
role="AI Expert",
goal="Explain AI",
backstory="You are an AI expert who loves to explain AI concepts.",
knowledge=[knowledge],
)
task = Task(
description="Explain what AI is",
expected_output="Explanation of AI",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
)
result = crew.kickoff()
self.assertIsNotNone(result)

View File

@@ -0,0 +1,92 @@
"""Test Elasticsearch knowledge storage functionality."""
import os
import unittest
from unittest.mock import MagicMock, patch
import pytest
from crewai.knowledge.storage.elasticsearch_knowledge_storage import ElasticsearchKnowledgeStorage
@pytest.mark.skipif(
os.environ.get("RUN_ELASTICSEARCH_TESTS") != "true",
reason="Elasticsearch tests require RUN_ELASTICSEARCH_TESTS=true"
)
class TestElasticsearchKnowledgeStorage(unittest.TestCase):
"""Test Elasticsearch knowledge storage functionality."""
def setUp(self):
"""Set up test fixtures."""
self.es_mock = MagicMock()
self.es_mock.indices.exists.return_value = False
self.embedder_mock = MagicMock()
self.embedder_mock.embed_documents.return_value = [[0.1, 0.2, 0.3]]
self.es_patcher = patch(
"crewai.knowledge.storage.elasticsearch_knowledge_storage.Elasticsearch",
return_value=self.es_mock
)
self.es_class_mock = self.es_patcher.start()
self.storage = ElasticsearchKnowledgeStorage(
embedder_config=self.embedder_mock,
collection_name="test"
)
self.storage.initialize_knowledge_storage()
def tearDown(self):
"""Tear down test fixtures."""
self.es_patcher.stop()
def test_initialization(self):
"""Test initialization of Elasticsearch knowledge storage."""
self.es_class_mock.assert_called_once()
self.es_mock.indices.create.assert_called_once()
def test_save(self):
"""Test saving to Elasticsearch knowledge storage."""
self.storage.save(["Test document 1", "Test document 2"], {"source": "test"})
self.assertEqual(self.es_mock.index.call_count, 2)
self.assertEqual(self.embedder_mock.embed_documents.call_count, 2)
def test_search(self):
"""Test searching in Elasticsearch knowledge storage."""
self.es_mock.search.return_value = {
"hits": {
"hits": [
{
"_id": "test_id",
"_score": 1.5, # Score between 1-2 (Elasticsearch range)
"_source": {
"text": "Test document",
"metadata": {"source": "test"},
}
}
]
}
}
results = self.storage.search(["test query"])
self.es_mock.search.assert_called_once()
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["id"], "test_id")
self.assertEqual(results[0]["context"], "Test document")
self.assertEqual(results[0]["metadata"], {"source": "test"})
self.assertEqual(results[0]["score"], 0.5) # Adjusted to 0-1 range
def test_reset(self):
"""Test resetting Elasticsearch knowledge storage."""
self.es_mock.indices.exists.return_value = True
self.storage.reset()
self.es_mock.indices.delete.assert_called_once()
self.assertEqual(self.es_mock.indices.create.call_count, 2)

View File

@@ -0,0 +1,91 @@
"""Test Elasticsearch storage functionality."""
import os
import unittest
from unittest.mock import MagicMock, patch
import pytest
from crewai.memory.storage.elasticsearch_storage import ElasticsearchStorage
@pytest.mark.skipif(
os.environ.get("RUN_ELASTICSEARCH_TESTS") != "true",
reason="Elasticsearch tests require RUN_ELASTICSEARCH_TESTS=true"
)
class TestElasticsearchStorage(unittest.TestCase):
"""Test Elasticsearch storage functionality."""
def setUp(self):
"""Set up test fixtures."""
self.es_mock = MagicMock()
self.es_mock.indices.exists.return_value = False
self.embedder_mock = MagicMock()
self.embedder_mock.embed_documents.return_value = [[0.1, 0.2, 0.3]]
self.es_patcher = patch(
"crewai.memory.storage.elasticsearch_storage.Elasticsearch",
return_value=self.es_mock
)
self.es_class_mock = self.es_patcher.start()
self.storage = ElasticsearchStorage(
type="test",
embedder_config=self.embedder_mock
)
def tearDown(self):
"""Tear down test fixtures."""
self.es_patcher.stop()
def test_initialization(self):
"""Test initialization of Elasticsearch storage."""
self.es_class_mock.assert_called_once()
self.es_mock.indices.create.assert_called_once()
def test_save(self):
"""Test saving to Elasticsearch storage."""
self.storage.save("Test document", {"source": "test"})
self.es_mock.index.assert_called_once()
self.embedder_mock.embed_documents.assert_called_once_with(["Test document"])
def test_search(self):
"""Test searching in Elasticsearch storage."""
self.es_mock.search.return_value = {
"hits": {
"hits": [
{
"_id": "test_id",
"_score": 1.5, # Score between 1-2 (Elasticsearch range)
"_source": {
"text": "Test document",
"metadata": {"source": "test"},
}
}
]
}
}
results = self.storage.search("test query")
self.es_mock.search.assert_called_once()
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["id"], "test_id")
self.assertEqual(results[0]["context"], "Test document")
self.assertEqual(results[0]["metadata"], {"source": "test"})
self.assertEqual(results[0]["score"], 0.5) # Adjusted to 0-1 range
def test_reset(self):
"""Test resetting Elasticsearch storage."""
self.es_mock.indices.exists.return_value = True
self.storage.reset()
self.es_mock.indices.delete.assert_called_once()
self.assertEqual(self.es_mock.indices.create.call_count, 2)