Compare commits

...

3 Commits

Author SHA1 Message Date
Devin AI
5c39e9c444 Move os import to top-level imports
- Follow Python best practices by placing import at module level
- Remove redundant inline import from __init__ method

Co-Authored-By: João <joao@crewai.com>
2025-11-19 07:37:06 +00:00
Devin AI
1d95940aca Fix RAGStorage to use custom path for ChromaDB persist_directory
- Add path normalization to convert relative paths to absolute paths
- Override ChromaDB persist_directory when custom path is provided
- Treat empty/whitespace paths as None (use default behavior)
- Add comprehensive tests for custom path functionality
- Test ShortTermMemory and EntityMemory with custom paths
- Test default behavior remains unchanged
- Test path isolation between different storage instances
- Test edge cases (empty string, whitespace, relative paths)

Fixes #3947

Co-Authored-By: João <joao@crewai.com>
2025-11-19 07:31:47 +00:00
Greyson LaLonde
d160f0874a chore: don't fail on cleanup error
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
2025-11-19 01:28:25 -05:00
6 changed files with 303 additions and 20 deletions

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import logging
import os
import traceback
from typing import TYPE_CHECKING, Any, cast
import warnings
@@ -47,7 +48,11 @@ class RAGStorage(BaseRAGStorage):
self._client: BaseClient | None = None
self.allow_reset = allow_reset
self.path = path
if path and path.strip():
self.path = os.path.abspath(path.strip())
else:
self.path = None
warnings.filterwarnings(
"ignore",
@@ -96,6 +101,10 @@ class RAGStorage(BaseRAGStorage):
ChromaEmbeddingFunctionWrapper, embedding_function
)
)
if self.path:
config.settings.persist_directory = self.path
self._client = create_client(config)
def _get_client(self) -> BaseClient:

View File

@@ -13,7 +13,7 @@ load_result = load_dotenv(override=True)
@pytest.fixture(autouse=True)
def setup_test_environment():
"""Set up test environment with a temporary directory for SQLite storage."""
with tempfile.TemporaryDirectory() as temp_dir:
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir:
# Create the directory with proper permissions
storage_dir = Path(temp_dir) / "crewai_test_storage"
storage_dir.mkdir(parents=True, exist_ok=True)

View File

@@ -144,9 +144,8 @@ class TestAgentEvaluator:
mock_crew.tasks.append(task)
events = {}
started_event = threading.Event()
completed_event = threading.Event()
task_completed_event = threading.Event()
results_condition = threading.Condition()
results_ready = False
agent_evaluator = AgentEvaluator(
agents=[agent], evaluators=[GoalAlignmentEvaluator()]
@@ -156,13 +155,11 @@ class TestAgentEvaluator:
async def capture_started(source, event):
if event.agent_id == str(agent.id):
events["started"] = event
started_event.set()
@crewai_event_bus.on(AgentEvaluationCompletedEvent)
async def capture_completed(source, event):
if event.agent_id == str(agent.id):
events["completed"] = event
completed_event.set()
@crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
@@ -170,17 +167,20 @@ class TestAgentEvaluator:
@crewai_event_bus.on(TaskCompletedEvent)
async def on_task_completed(source, event):
# TaskCompletedEvent fires AFTER evaluation results are stored
nonlocal results_ready
if event.task and event.task.id == task.id:
task_completed_event.set()
while not agent_evaluator.get_evaluation_results().get(agent.role):
pass
with results_condition:
results_ready = True
results_condition.notify()
mock_crew.kickoff()
assert started_event.wait(timeout=5), "Timeout waiting for started event"
assert completed_event.wait(timeout=5), "Timeout waiting for completed event"
assert task_completed_event.wait(timeout=5), (
"Timeout waiting for task completion"
)
with results_condition:
assert results_condition.wait_for(
lambda: results_ready, timeout=5
), "Timeout waiting for evaluation results"
assert events.keys() == {"started", "completed"}
assert events["started"].agent_id == str(agent.id)

View File

@@ -0,0 +1,270 @@
"""Tests for RAGStorage custom path functionality."""
import os
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.task import Task
@pytest.fixture
def crew():
"""Fixture to create a simple Crew instance."""
agent = Agent(
role="Researcher",
goal="Search relevant data",
backstory="You are a researcher.",
tools=[],
)
task = Task(
description="Perform a search.",
expected_output="A list of results.",
agent=agent,
)
return Crew(agents=[agent], tasks=[task])
@pytest.fixture
def fake_embedder_config():
"""Fixture to provide a fake embedder config that doesn't hit the network."""
def fake_embedding_function(texts):
"""Fake embedding function that returns constant vectors."""
if isinstance(texts, str):
texts = [texts]
return [[0.1] * 384 for _ in texts]
return fake_embedding_function
class TestRAGStorageCustomPath:
"""Test suite for RAGStorage custom path functionality."""
def test_rag_storage_with_custom_path_normalizes_to_absolute(self, crew, tmp_path):
"""Test that RAGStorage normalizes relative paths to absolute paths."""
relative_path = "relative/test/path"
with patch("crewai.memory.storage.rag_storage.build_embedder") as mock_embedder:
mock_embedder.return_value = lambda texts: [[0.1] * 384 for _ in texts]
storage = RAGStorage(
type="short_term",
embedder_config={"provider": "openai"},
crew=crew,
path=relative_path,
)
assert storage.path is not None
assert os.path.isabs(storage.path)
assert storage.path.endswith(relative_path)
def test_rag_storage_with_empty_string_path_uses_default(self, crew):
"""Test that empty string path falls back to default behavior."""
with patch("crewai.memory.storage.rag_storage.build_embedder") as mock_embedder:
mock_embedder.return_value = lambda texts: [[0.1] * 384 for _ in texts]
storage = RAGStorage(
type="short_term",
embedder_config={"provider": "openai"},
crew=crew,
path="",
)
assert storage.path is None
def test_rag_storage_with_whitespace_path_uses_default(self, crew):
"""Test that whitespace-only path falls back to default behavior."""
with patch("crewai.memory.storage.rag_storage.build_embedder") as mock_embedder:
mock_embedder.return_value = lambda texts: [[0.1] * 384 for _ in texts]
storage = RAGStorage(
type="short_term",
embedder_config={"provider": "openai"},
crew=crew,
path=" ",
)
assert storage.path is None
def test_rag_storage_with_none_path_uses_default(self, crew):
"""Test that None path uses default behavior."""
with patch("crewai.memory.storage.rag_storage.build_embedder") as mock_embedder:
mock_embedder.return_value = lambda texts: [[0.1] * 384 for _ in texts]
storage = RAGStorage(
type="short_term",
embedder_config={"provider": "openai"},
crew=crew,
path=None,
)
assert storage.path is None
def test_rag_storage_sets_persist_directory_when_path_provided(self, crew, tmp_path):
"""Test that RAGStorage sets ChromaDB persist_directory when path is provided."""
custom_path = str(tmp_path / "custom_storage")
with patch("crewai.memory.storage.rag_storage.build_embedder") as mock_embedder, \
patch("crewai.memory.storage.rag_storage.create_client") as mock_create_client:
mock_embedder.return_value = lambda texts: [[0.1] * 384 for _ in texts]
mock_create_client.return_value = MagicMock()
storage = RAGStorage(
type="short_term",
embedder_config={"provider": "openai"},
crew=crew,
path=custom_path,
)
assert mock_create_client.called
config = mock_create_client.call_args[0][0]
assert config.settings.persist_directory == os.path.abspath(custom_path)
def test_rag_storage_does_not_override_persist_directory_when_no_path(self, crew):
"""Test that RAGStorage doesn't override persist_directory when path is None."""
with patch("crewai.memory.storage.rag_storage.build_embedder") as mock_embedder, \
patch("crewai.memory.storage.rag_storage.create_client") as mock_create_client:
mock_embedder.return_value = lambda texts: [[0.1] * 384 for _ in texts]
mock_create_client.return_value = MagicMock()
storage = RAGStorage(
type="short_term",
embedder_config={"provider": "openai"},
crew=crew,
path=None,
)
assert mock_create_client.called
config = mock_create_client.call_args[0][0]
assert "CrewAI" in config.settings.persist_directory or "crewai" in config.settings.persist_directory.lower()
class TestShortTermMemoryCustomPath:
"""Test suite for ShortTermMemory with custom path."""
def test_short_term_memory_with_custom_path(self, crew, tmp_path):
"""Test that ShortTermMemory accepts and uses custom path."""
custom_path = str(tmp_path / "short_term_storage")
with patch("crewai.memory.storage.rag_storage.build_embedder") as mock_embedder, \
patch("crewai.memory.storage.rag_storage.create_client") as mock_create_client:
mock_embedder.return_value = lambda texts: [[0.1] * 384 for _ in texts]
mock_client = MagicMock()
mock_create_client.return_value = mock_client
memory = ShortTermMemory(
crew=crew,
embedder_config={"provider": "openai"},
path=custom_path,
)
assert memory.storage.path == os.path.abspath(custom_path)
assert mock_create_client.called
config = mock_create_client.call_args[0][0]
assert config.settings.persist_directory == os.path.abspath(custom_path)
def test_short_term_memory_default_behavior_unchanged(self, crew):
"""Test that ShortTermMemory default behavior (no path) is unchanged."""
with patch("crewai.memory.storage.rag_storage.build_embedder") as mock_embedder, \
patch("crewai.memory.storage.rag_storage.create_client") as mock_create_client:
mock_embedder.return_value = lambda texts: [[0.1] * 384 for _ in texts]
mock_client = MagicMock()
mock_create_client.return_value = mock_client
memory = ShortTermMemory(
crew=crew,
embedder_config={"provider": "openai"},
)
assert memory.storage.path is None
class TestEntityMemoryCustomPath:
"""Test suite for EntityMemory with custom path."""
def test_entity_memory_with_custom_path(self, crew, tmp_path):
"""Test that EntityMemory accepts and uses custom path."""
custom_path = str(tmp_path / "entity_storage")
with patch("crewai.memory.storage.rag_storage.build_embedder") as mock_embedder, \
patch("crewai.memory.storage.rag_storage.create_client") as mock_create_client:
mock_embedder.return_value = lambda texts: [[0.1] * 384 for _ in texts]
mock_client = MagicMock()
mock_create_client.return_value = mock_client
memory = EntityMemory(
crew=crew,
embedder_config={"provider": "openai"},
path=custom_path,
)
assert memory.storage.path == os.path.abspath(custom_path)
assert mock_create_client.called
config = mock_create_client.call_args[0][0]
assert config.settings.persist_directory == os.path.abspath(custom_path)
def test_entity_memory_default_behavior_unchanged(self, crew):
"""Test that EntityMemory default behavior (no path) is unchanged."""
with patch("crewai.memory.storage.rag_storage.build_embedder") as mock_embedder, \
patch("crewai.memory.storage.rag_storage.create_client") as mock_create_client:
mock_embedder.return_value = lambda texts: [[0.1] * 384 for _ in texts]
mock_client = MagicMock()
mock_create_client.return_value = mock_client
memory = EntityMemory(
crew=crew,
embedder_config={"provider": "openai"},
)
assert memory.storage.path is None
class TestPathIsolation:
"""Test suite for verifying path isolation between different storage instances."""
def test_different_paths_create_different_storage_instances(self, crew, tmp_path):
"""Test that different paths result in isolated storage."""
path1 = str(tmp_path / "storage1")
path2 = str(tmp_path / "storage2")
with patch("crewai.memory.storage.rag_storage.build_embedder") as mock_embedder, \
patch("crewai.memory.storage.rag_storage.create_client") as mock_create_client:
mock_embedder.return_value = lambda texts: [[0.1] * 384 for _ in texts]
mock_create_client.return_value = MagicMock()
storage1 = RAGStorage(
type="short_term",
embedder_config={"provider": "openai"},
crew=crew,
path=path1,
)
storage2 = RAGStorage(
type="short_term",
embedder_config={"provider": "openai"},
crew=crew,
path=path2,
)
assert storage1.path != storage2.path
assert storage1.path == os.path.abspath(path1)
assert storage2.path == os.path.abspath(path2)
assert mock_create_client.call_count == 2
config1 = mock_create_client.call_args_list[0][0][0]
config2 = mock_create_client.call_args_list[1][0][0]
assert config1.settings.persist_directory != config2.settings.persist_directory

View File

@@ -647,6 +647,7 @@ def test_handle_streaming_tool_calls_no_tools(mock_emit):
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.mark.skip(reason="Highly flaky on ci")
def test_llm_call_when_stop_is_unsupported(caplog):
llm = LLM(model="o1-mini", stop=["stop"], is_litellm=True)
with caplog.at_level(logging.INFO):
@@ -657,6 +658,7 @@ def test_llm_call_when_stop_is_unsupported(caplog):
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.mark.skip(reason="Highly flaky on ci")
def test_llm_call_when_stop_is_unsupported_when_additional_drop_params_is_provided(
caplog,
):
@@ -664,7 +666,6 @@ def test_llm_call_when_stop_is_unsupported_when_additional_drop_params_is_provid
model="o1-mini",
stop=["stop"],
additional_drop_params=["another_param"],
is_litellm=True,
)
with caplog.at_level(logging.INFO):
result = llm.call("What is the capital of France?")

View File

@@ -273,12 +273,15 @@ def another_simple_tool():
def test_internal_crew_with_mcp():
from crewai_tools import MCPServerAdapter
from crewai_tools.adapters.mcp_adapter import ToolCollection
from crewai_tools.adapters.tool_collection import ToolCollection
mock = Mock(spec=MCPServerAdapter)
mock.tools = ToolCollection([simple_tool, another_simple_tool])
with patch("crewai_tools.MCPServerAdapter", return_value=mock) as adapter_mock:
mock_adapter = Mock()
mock_adapter.tools = ToolCollection([simple_tool, another_simple_tool])
with (
patch("crewai_tools.MCPServerAdapter", return_value=mock_adapter) as adapter_mock,
patch("crewai.llm.LLM.__new__", return_value=Mock()),
):
crew = InternalCrewWithMCP()
assert crew.reporting_analyst().tools == [simple_tool, another_simple_tool]
assert crew.researcher().tools == [simple_tool]