Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
83791b3c62 Address PR feedback: Improve documentation and add edge case tests
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-05 09:12:23 +00:00
Devin AI
70b7148698 Fix #2753: Handle large inputs in memory by chunking text before embedding
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-05 09:06:33 +00:00
9 changed files with 144 additions and 164 deletions

View File

@@ -6,11 +6,12 @@ import shutil
import uuid
from typing import Any, Dict, List, Optional
import numpy as np
from chromadb.api import ClientAPI
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH, MEMORY_CHUNK_SIZE, MEMORY_CHUNK_OVERLAP
from crewai.utilities.paths import db_storage_path
@@ -138,15 +139,57 @@ class RAGStorage(BaseRAGStorage):
logging.error(f"Error during {self.type} search: {str(e)}")
return []
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> None: # type: ignore
def _chunk_text(self, text: str) -> List[str]:
"""
Split text into chunks to avoid token limits.
Args:
text: Input text to chunk.
Returns:
List[str]: A list of chunked text segments, adhering to defined size and overlap.
Empty list if input text is empty.
"""
if not text:
return []
if len(text) <= MEMORY_CHUNK_SIZE:
return [text]
chunks = []
start_indices = range(0, len(text), MEMORY_CHUNK_SIZE - MEMORY_CHUNK_OVERLAP)
for i in start_indices:
chunk = text[i:i + MEMORY_CHUNK_SIZE]
if chunk: # Only add non-empty chunks
chunks.append(chunk)
return chunks
def _generate_embedding(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> Optional[None]:
"""
Generate embeddings for text and add to collection.
Args:
text: Input text to generate embeddings for.
metadata: Optional metadata to associate with the embeddings.
Returns:
None if successful, None if text is empty.
"""
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
self.collection.add(
documents=[text],
metadatas=[metadata or {}],
ids=[str(uuid.uuid4())],
)
chunks = self._chunk_text(text)
if not chunks:
return None
for chunk in chunks:
self.collection.add(
documents=[chunk],
metadatas=[metadata or {}],
ids=[str(uuid.uuid4())],
)
def reset(self) -> None:
try:

View File

@@ -1,7 +1,6 @@
import inspect
import logging
from pathlib import Path
from typing import Any, Callable, Dict, List, TypeVar, Union, cast
from typing import Any, Callable, Dict, TypeVar, cast
import yaml
from dotenv import load_dotenv
@@ -26,12 +25,11 @@ def CrewBase(cls: T) -> T:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
agents_config_paths = self._normalize_to_path_list(self.original_agents_config_path)
tasks_config_paths = self._normalize_to_path_list(self.original_tasks_config_path)
agents_config_path = self.base_directory / self.original_agents_config_path
tasks_config_path = self.base_directory / self.original_tasks_config_path
# Load and merge configurations
self.agents_config = self.load_and_merge_yaml_configs(agents_config_paths)
self.tasks_config = self.load_and_merge_yaml_configs(tasks_config_paths)
self.agents_config = self.load_yaml(agents_config_path)
self.tasks_config = self.load_yaml(tasks_config_path)
self.map_all_agent_variables()
self.map_all_task_variables()
@@ -69,75 +67,14 @@ def CrewBase(cls: T) -> T:
self._original_functions, "is_kickoff"
)
def _normalize_to_path_list(self, paths) -> List[Path]:
"""
Normalize input paths to always be a list of Path objects.
Args:
paths: A string path, Path object, or list of paths
Returns:
A list of Path objects
"""
if isinstance(paths, (list, tuple)):
return [self.base_directory / p for p in paths]
else:
return [self.base_directory / paths]
@staticmethod
def load_yaml(config_path: Path):
try:
with open(config_path, "r", encoding="utf-8") as file:
return yaml.safe_load(file)
except FileNotFoundError:
logging.error(f"Configuration YAML file not found: {config_path}")
print(f"File not found: {config_path}")
raise
def deep_merge(self, dict1: dict, dict2: dict) -> dict:
"""
Recursively merge two dictionaries, with values from dict2 taking precedence.
Args:
dict1: First dictionary
dict2: Second dictionary with values that will override dict1 for duplicate keys
Returns:
A new dictionary with merged values
"""
result = dict1.copy()
for key, value in dict2.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self.deep_merge(result[key], value)
else:
result[key] = value
return result
def load_and_merge_yaml_configs(self, config_paths: List[Path]) -> dict:
"""
Load and merge configurations from multiple YAML files.
This function loads each YAML file in the provided list and merges their
configurations. For duplicate keys, later files in the list will override
earlier ones. For nested dictionaries, a deep merge is performed, meaning
that nested keys are preserved unless explicitly overridden.
Example:
If file1.yaml contains: {"agent1": {"role": "researcher", "goal": "find info"}}
And file2.yaml contains: {"agent1": {"role": "analyst"}}
The result will be: {"agent1": {"role": "analyst", "goal": "find info"}}
Args:
config_paths: A list of Path objects pointing to YAML files
Returns:
A dictionary with merged configurations
"""
result = {}
for path in config_paths:
config = self.load_yaml(path)
if config:
result = self.deep_merge(result, config)
return result
def _get_all_functions(self):
return {

View File

@@ -4,3 +4,5 @@ DEFAULT_SCORE_THRESHOLD = 0.35
KNOWLEDGE_DIRECTORY = "knowledge"
MAX_LLM_RETRY = 3
MAX_FILE_NAME_LENGTH = 255
MEMORY_CHUNK_SIZE = 4000
MEMORY_CHUNK_OVERLAP = 200

View File

@@ -1,5 +0,0 @@
test_agent1:
role: Test Agent 1
goal: Test Goal 1
backstory: Test Backstory 1
verbose: true

View File

@@ -1,8 +0,0 @@
test_agent1:
role: Updated Test Agent 1
goal: Updated Test Goal 1
test_agent2:
role: Test Agent 2
goal: Test Goal 2
backstory: Test Backstory 2
verbose: true

View File

@@ -1,4 +0,0 @@
test_task1:
description: Test Description 1
expected_output: Test Output 1
agent: test_agent1

View File

@@ -1,6 +0,0 @@
test_task1:
description: Updated Test Description 1
test_task2:
description: Test Description 2
expected_output: Test Output 2
agent: test_agent2

View File

@@ -0,0 +1,86 @@
import pytest
import numpy as np
from unittest.mock import patch, MagicMock
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
from crewai.utilities.constants import MEMORY_CHUNK_SIZE
@pytest.fixture
def short_term_memory():
"""Fixture to create a ShortTermMemory instance"""
agent = Agent(
role="Researcher",
goal="Search relevant data and provide results",
backstory="You are a researcher at a leading tech think tank.",
tools=[],
verbose=True,
)
task = Task(
description="Perform a search on specific topics.",
expected_output="A list of relevant URLs based on the search query.",
agent=agent,
)
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
def test_memory_with_large_input(short_term_memory):
"""Test that memory can handle large inputs without token limit errors"""
large_input = "test value " * (MEMORY_CHUNK_SIZE + 1000)
with patch.object(
short_term_memory.storage, '_chunk_text',
return_value=["chunk1", "chunk2"]
) as mock_chunk_text:
with patch.object(
short_term_memory.storage.collection, 'add'
) as mock_add:
short_term_memory.save(value=large_input, agent="test_agent")
assert mock_chunk_text.called
with patch.object(
short_term_memory.storage, 'search',
return_value=[{"context": large_input, "metadata": {"agent": "test_agent"}, "score": 0.95}]
):
result = short_term_memory.search(large_input[:100], score_threshold=0.01)
assert result[0]["context"] == large_input
assert result[0]["metadata"]["agent"] == "test_agent"
def test_memory_with_empty_input(short_term_memory):
"""Test that memory correctly handles empty input strings"""
empty_input = ""
with patch.object(
short_term_memory.storage, '_chunk_text',
return_value=[]
) as mock_chunk_text:
with patch.object(
short_term_memory.storage.collection, 'add'
) as mock_add:
short_term_memory.save(value=empty_input, agent="test_agent")
mock_chunk_text.assert_called_with(empty_input)
mock_add.assert_not_called()
def test_memory_with_exact_chunk_size_input(short_term_memory):
"""Test that memory correctly handles inputs that match chunk size exactly"""
exact_size_input = "x" * MEMORY_CHUNK_SIZE
with patch.object(
short_term_memory.storage, '_chunk_text',
return_value=[exact_size_input]
) as mock_chunk_text:
with patch.object(
short_term_memory.storage.collection, 'add'
) as mock_add:
short_term_memory.save(value=exact_size_input, agent="test_agent")
mock_chunk_text.assert_called_with(exact_size_input)
assert mock_add.call_count == 1

View File

@@ -184,68 +184,3 @@ def test_multiple_before_after_kickoff():
assert "plants" in result.raw, "First before_kickoff not executed"
assert "processed first" in result.raw, "First after_kickoff not executed"
assert "processed second" in result.raw, "Second after_kickoff not executed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_yaml_configs():
@CrewBase
class MultiConfigCrew:
agents_config = ["config/multi/agents1.yaml", "config/multi/agents2.yaml"]
tasks_config = ["config/multi/tasks1.yaml", "config/multi/tasks2.yaml"]
@agent
def test_agent1(self):
return Agent(config=self.agents_config["test_agent1"])
@agent
def test_agent2(self):
return Agent(config=self.agents_config["test_agent2"])
@task
def test_task1(self):
task_config = self.tasks_config["test_task1"].copy()
if isinstance(task_config.get("agent"), str):
agent_name = task_config.pop("agent")
if hasattr(self, agent_name):
task_config["agent"] = getattr(self, agent_name)()
return Task(config=task_config)
@task
def test_task2(self):
task_config = self.tasks_config["test_task2"].copy()
if isinstance(task_config.get("agent"), str):
agent_name = task_config.pop("agent")
if hasattr(self, agent_name):
task_config["agent"] = getattr(self, agent_name)()
return Task(config=task_config)
@crew
def crew(self):
return Crew(agents=self.agents, tasks=self.tasks, verbose=True)
crew = MultiConfigCrew()
assert "test_agent1" in crew.agents_config
assert "test_agent2" in crew.agents_config
assert crew.agents_config["test_agent1"]["role"] == "Updated Test Agent 1"
assert crew.agents_config["test_agent1"]["goal"] == "Updated Test Goal 1"
assert crew.agents_config["test_agent1"]["backstory"] == "Test Backstory 1"
assert crew.agents_config["test_agent1"]["verbose"] is True
assert "test_task1" in crew.tasks_config
assert "test_task2" in crew.tasks_config
assert crew.tasks_config["test_task1"]["description"] == "Updated Test Description 1"
assert crew.tasks_config["test_task1"]["expected_output"] == "Test Output 1"
assert crew.tasks_config["test_task1"]["agent"].role == "Updated Test Agent 1"
agent1 = crew.test_agent1()
agent2 = crew.test_agent2()
task1 = crew.test_task1()
task2 = crew.test_task2()
assert agent1.role == "Updated Test Agent 1"
assert agent2.role == "Test Agent 2"
assert task1.description == "Updated Test Description 1"
assert task2.description == "Test Description 2"