mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 07:38:29 +00:00
Compare commits
2 Commits
devin/1745
...
devin/1746
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
83791b3c62 | ||
|
|
70b7148698 |
@@ -6,11 +6,12 @@ import shutil
|
||||
import uuid
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import numpy as np
|
||||
from chromadb.api import ClientAPI
|
||||
|
||||
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
|
||||
from crewai.utilities import EmbeddingConfigurator
|
||||
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
|
||||
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH, MEMORY_CHUNK_SIZE, MEMORY_CHUNK_OVERLAP
|
||||
from crewai.utilities.paths import db_storage_path
|
||||
|
||||
|
||||
@@ -138,15 +139,57 @@ class RAGStorage(BaseRAGStorage):
|
||||
logging.error(f"Error during {self.type} search: {str(e)}")
|
||||
return []
|
||||
|
||||
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> None: # type: ignore
|
||||
def _chunk_text(self, text: str) -> List[str]:
|
||||
"""
|
||||
Split text into chunks to avoid token limits.
|
||||
|
||||
Args:
|
||||
text: Input text to chunk.
|
||||
|
||||
Returns:
|
||||
List[str]: A list of chunked text segments, adhering to defined size and overlap.
|
||||
Empty list if input text is empty.
|
||||
"""
|
||||
if not text:
|
||||
return []
|
||||
|
||||
if len(text) <= MEMORY_CHUNK_SIZE:
|
||||
return [text]
|
||||
|
||||
chunks = []
|
||||
start_indices = range(0, len(text), MEMORY_CHUNK_SIZE - MEMORY_CHUNK_OVERLAP)
|
||||
for i in start_indices:
|
||||
chunk = text[i:i + MEMORY_CHUNK_SIZE]
|
||||
if chunk: # Only add non-empty chunks
|
||||
chunks.append(chunk)
|
||||
|
||||
return chunks
|
||||
|
||||
def _generate_embedding(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> Optional[None]:
|
||||
"""
|
||||
Generate embeddings for text and add to collection.
|
||||
|
||||
Args:
|
||||
text: Input text to generate embeddings for.
|
||||
metadata: Optional metadata to associate with the embeddings.
|
||||
|
||||
Returns:
|
||||
None if successful, None if text is empty.
|
||||
"""
|
||||
if not hasattr(self, "app") or not hasattr(self, "collection"):
|
||||
self._initialize_app()
|
||||
|
||||
self.collection.add(
|
||||
documents=[text],
|
||||
metadatas=[metadata or {}],
|
||||
ids=[str(uuid.uuid4())],
|
||||
)
|
||||
chunks = self._chunk_text(text)
|
||||
|
||||
if not chunks:
|
||||
return None
|
||||
|
||||
for chunk in chunks:
|
||||
self.collection.add(
|
||||
documents=[chunk],
|
||||
metadatas=[metadata or {}],
|
||||
ids=[str(uuid.uuid4())],
|
||||
)
|
||||
|
||||
def reset(self) -> None:
|
||||
try:
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import inspect
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, TypeVar, Union, cast
|
||||
from typing import Any, Callable, Dict, TypeVar, cast
|
||||
|
||||
import yaml
|
||||
from dotenv import load_dotenv
|
||||
@@ -26,12 +25,11 @@ def CrewBase(cls: T) -> T:
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
agents_config_paths = self._normalize_to_path_list(self.original_agents_config_path)
|
||||
tasks_config_paths = self._normalize_to_path_list(self.original_tasks_config_path)
|
||||
agents_config_path = self.base_directory / self.original_agents_config_path
|
||||
tasks_config_path = self.base_directory / self.original_tasks_config_path
|
||||
|
||||
# Load and merge configurations
|
||||
self.agents_config = self.load_and_merge_yaml_configs(agents_config_paths)
|
||||
self.tasks_config = self.load_and_merge_yaml_configs(tasks_config_paths)
|
||||
self.agents_config = self.load_yaml(agents_config_path)
|
||||
self.tasks_config = self.load_yaml(tasks_config_path)
|
||||
|
||||
self.map_all_agent_variables()
|
||||
self.map_all_task_variables()
|
||||
@@ -69,75 +67,14 @@ def CrewBase(cls: T) -> T:
|
||||
self._original_functions, "is_kickoff"
|
||||
)
|
||||
|
||||
def _normalize_to_path_list(self, paths) -> List[Path]:
|
||||
"""
|
||||
Normalize input paths to always be a list of Path objects.
|
||||
|
||||
Args:
|
||||
paths: A string path, Path object, or list of paths
|
||||
|
||||
Returns:
|
||||
A list of Path objects
|
||||
"""
|
||||
if isinstance(paths, (list, tuple)):
|
||||
return [self.base_directory / p for p in paths]
|
||||
else:
|
||||
return [self.base_directory / paths]
|
||||
|
||||
@staticmethod
|
||||
def load_yaml(config_path: Path):
|
||||
try:
|
||||
with open(config_path, "r", encoding="utf-8") as file:
|
||||
return yaml.safe_load(file)
|
||||
except FileNotFoundError:
|
||||
logging.error(f"Configuration YAML file not found: {config_path}")
|
||||
print(f"File not found: {config_path}")
|
||||
raise
|
||||
|
||||
def deep_merge(self, dict1: dict, dict2: dict) -> dict:
|
||||
"""
|
||||
Recursively merge two dictionaries, with values from dict2 taking precedence.
|
||||
|
||||
Args:
|
||||
dict1: First dictionary
|
||||
dict2: Second dictionary with values that will override dict1 for duplicate keys
|
||||
|
||||
Returns:
|
||||
A new dictionary with merged values
|
||||
"""
|
||||
result = dict1.copy()
|
||||
for key, value in dict2.items():
|
||||
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
|
||||
result[key] = self.deep_merge(result[key], value)
|
||||
else:
|
||||
result[key] = value
|
||||
return result
|
||||
|
||||
def load_and_merge_yaml_configs(self, config_paths: List[Path]) -> dict:
|
||||
"""
|
||||
Load and merge configurations from multiple YAML files.
|
||||
|
||||
This function loads each YAML file in the provided list and merges their
|
||||
configurations. For duplicate keys, later files in the list will override
|
||||
earlier ones. For nested dictionaries, a deep merge is performed, meaning
|
||||
that nested keys are preserved unless explicitly overridden.
|
||||
|
||||
Example:
|
||||
If file1.yaml contains: {"agent1": {"role": "researcher", "goal": "find info"}}
|
||||
And file2.yaml contains: {"agent1": {"role": "analyst"}}
|
||||
The result will be: {"agent1": {"role": "analyst", "goal": "find info"}}
|
||||
|
||||
Args:
|
||||
config_paths: A list of Path objects pointing to YAML files
|
||||
|
||||
Returns:
|
||||
A dictionary with merged configurations
|
||||
"""
|
||||
result = {}
|
||||
for path in config_paths:
|
||||
config = self.load_yaml(path)
|
||||
if config:
|
||||
result = self.deep_merge(result, config)
|
||||
return result
|
||||
|
||||
def _get_all_functions(self):
|
||||
return {
|
||||
|
||||
@@ -4,3 +4,5 @@ DEFAULT_SCORE_THRESHOLD = 0.35
|
||||
KNOWLEDGE_DIRECTORY = "knowledge"
|
||||
MAX_LLM_RETRY = 3
|
||||
MAX_FILE_NAME_LENGTH = 255
|
||||
MEMORY_CHUNK_SIZE = 4000
|
||||
MEMORY_CHUNK_OVERLAP = 200
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
test_agent1:
|
||||
role: Test Agent 1
|
||||
goal: Test Goal 1
|
||||
backstory: Test Backstory 1
|
||||
verbose: true
|
||||
@@ -1,8 +0,0 @@
|
||||
test_agent1:
|
||||
role: Updated Test Agent 1
|
||||
goal: Updated Test Goal 1
|
||||
test_agent2:
|
||||
role: Test Agent 2
|
||||
goal: Test Goal 2
|
||||
backstory: Test Backstory 2
|
||||
verbose: true
|
||||
@@ -1,4 +0,0 @@
|
||||
test_task1:
|
||||
description: Test Description 1
|
||||
expected_output: Test Output 1
|
||||
agent: test_agent1
|
||||
@@ -1,6 +0,0 @@
|
||||
test_task1:
|
||||
description: Updated Test Description 1
|
||||
test_task2:
|
||||
description: Test Description 2
|
||||
expected_output: Test Output 2
|
||||
agent: test_agent2
|
||||
86
tests/memory/large_input_memory_test.py
Normal file
86
tests/memory/large_input_memory_test.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import pytest
|
||||
import numpy as np
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from crewai.memory.short_term.short_term_memory import ShortTermMemory
|
||||
from crewai.agent import Agent
|
||||
from crewai.crew import Crew
|
||||
from crewai.task import Task
|
||||
from crewai.utilities.constants import MEMORY_CHUNK_SIZE
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def short_term_memory():
|
||||
"""Fixture to create a ShortTermMemory instance"""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Search relevant data and provide results",
|
||||
backstory="You are a researcher at a leading tech think tank.",
|
||||
tools=[],
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Perform a search on specific topics.",
|
||||
expected_output="A list of relevant URLs based on the search query.",
|
||||
agent=agent,
|
||||
)
|
||||
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
|
||||
|
||||
|
||||
def test_memory_with_large_input(short_term_memory):
|
||||
"""Test that memory can handle large inputs without token limit errors"""
|
||||
large_input = "test value " * (MEMORY_CHUNK_SIZE + 1000)
|
||||
|
||||
with patch.object(
|
||||
short_term_memory.storage, '_chunk_text',
|
||||
return_value=["chunk1", "chunk2"]
|
||||
) as mock_chunk_text:
|
||||
with patch.object(
|
||||
short_term_memory.storage.collection, 'add'
|
||||
) as mock_add:
|
||||
short_term_memory.save(value=large_input, agent="test_agent")
|
||||
|
||||
assert mock_chunk_text.called
|
||||
|
||||
with patch.object(
|
||||
short_term_memory.storage, 'search',
|
||||
return_value=[{"context": large_input, "metadata": {"agent": "test_agent"}, "score": 0.95}]
|
||||
):
|
||||
result = short_term_memory.search(large_input[:100], score_threshold=0.01)
|
||||
assert result[0]["context"] == large_input
|
||||
assert result[0]["metadata"]["agent"] == "test_agent"
|
||||
|
||||
|
||||
def test_memory_with_empty_input(short_term_memory):
|
||||
"""Test that memory correctly handles empty input strings"""
|
||||
empty_input = ""
|
||||
|
||||
with patch.object(
|
||||
short_term_memory.storage, '_chunk_text',
|
||||
return_value=[]
|
||||
) as mock_chunk_text:
|
||||
with patch.object(
|
||||
short_term_memory.storage.collection, 'add'
|
||||
) as mock_add:
|
||||
short_term_memory.save(value=empty_input, agent="test_agent")
|
||||
|
||||
mock_chunk_text.assert_called_with(empty_input)
|
||||
mock_add.assert_not_called()
|
||||
|
||||
|
||||
def test_memory_with_exact_chunk_size_input(short_term_memory):
|
||||
"""Test that memory correctly handles inputs that match chunk size exactly"""
|
||||
exact_size_input = "x" * MEMORY_CHUNK_SIZE
|
||||
|
||||
with patch.object(
|
||||
short_term_memory.storage, '_chunk_text',
|
||||
return_value=[exact_size_input]
|
||||
) as mock_chunk_text:
|
||||
with patch.object(
|
||||
short_term_memory.storage.collection, 'add'
|
||||
) as mock_add:
|
||||
short_term_memory.save(value=exact_size_input, agent="test_agent")
|
||||
|
||||
mock_chunk_text.assert_called_with(exact_size_input)
|
||||
assert mock_add.call_count == 1
|
||||
@@ -184,68 +184,3 @@ def test_multiple_before_after_kickoff():
|
||||
assert "plants" in result.raw, "First before_kickoff not executed"
|
||||
assert "processed first" in result.raw, "First after_kickoff not executed"
|
||||
assert "processed second" in result.raw, "Second after_kickoff not executed"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_multiple_yaml_configs():
|
||||
@CrewBase
|
||||
class MultiConfigCrew:
|
||||
agents_config = ["config/multi/agents1.yaml", "config/multi/agents2.yaml"]
|
||||
tasks_config = ["config/multi/tasks1.yaml", "config/multi/tasks2.yaml"]
|
||||
|
||||
@agent
|
||||
def test_agent1(self):
|
||||
return Agent(config=self.agents_config["test_agent1"])
|
||||
|
||||
@agent
|
||||
def test_agent2(self):
|
||||
return Agent(config=self.agents_config["test_agent2"])
|
||||
|
||||
@task
|
||||
def test_task1(self):
|
||||
task_config = self.tasks_config["test_task1"].copy()
|
||||
if isinstance(task_config.get("agent"), str):
|
||||
agent_name = task_config.pop("agent")
|
||||
if hasattr(self, agent_name):
|
||||
task_config["agent"] = getattr(self, agent_name)()
|
||||
return Task(config=task_config)
|
||||
|
||||
@task
|
||||
def test_task2(self):
|
||||
task_config = self.tasks_config["test_task2"].copy()
|
||||
if isinstance(task_config.get("agent"), str):
|
||||
agent_name = task_config.pop("agent")
|
||||
if hasattr(self, agent_name):
|
||||
task_config["agent"] = getattr(self, agent_name)()
|
||||
return Task(config=task_config)
|
||||
|
||||
@crew
|
||||
def crew(self):
|
||||
return Crew(agents=self.agents, tasks=self.tasks, verbose=True)
|
||||
|
||||
crew = MultiConfigCrew()
|
||||
|
||||
assert "test_agent1" in crew.agents_config
|
||||
assert "test_agent2" in crew.agents_config
|
||||
|
||||
assert crew.agents_config["test_agent1"]["role"] == "Updated Test Agent 1"
|
||||
assert crew.agents_config["test_agent1"]["goal"] == "Updated Test Goal 1"
|
||||
assert crew.agents_config["test_agent1"]["backstory"] == "Test Backstory 1"
|
||||
assert crew.agents_config["test_agent1"]["verbose"] is True
|
||||
|
||||
assert "test_task1" in crew.tasks_config
|
||||
assert "test_task2" in crew.tasks_config
|
||||
|
||||
assert crew.tasks_config["test_task1"]["description"] == "Updated Test Description 1"
|
||||
assert crew.tasks_config["test_task1"]["expected_output"] == "Test Output 1"
|
||||
assert crew.tasks_config["test_task1"]["agent"].role == "Updated Test Agent 1"
|
||||
|
||||
agent1 = crew.test_agent1()
|
||||
agent2 = crew.test_agent2()
|
||||
task1 = crew.test_task1()
|
||||
task2 = crew.test_task2()
|
||||
|
||||
assert agent1.role == "Updated Test Agent 1"
|
||||
assert agent2.role == "Test Agent 2"
|
||||
assert task1.description == "Updated Test Description 1"
|
||||
assert task2.description == "Test Description 2"
|
||||
|
||||
Reference in New Issue
Block a user