Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
c1c64d9b8b Enhance: Implement code quality improvements from review feedback
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-15 18:07:42 +00:00
Devin AI
4a32d9310c Fix: Add --active flag to uv sync command to fix virtual environment detection
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-15 17:56:48 +00:00
5 changed files with 107 additions and 141 deletions

View File

@@ -1,19 +1,37 @@
import logging
import subprocess
from typing import List
import click
UV_COMMAND = "uv"
SYNC_COMMAND = "sync"
ACTIVE_FLAG = "--active"
def install_crew(proxy_options: list[str]) -> None:
logger = logging.getLogger(__name__)
def install_crew(proxy_options: List[str]) -> None:
"""
Install the crew by running the UV command to lock and install.
Args:
proxy_options (List[str]): List of proxy-related command options.
Note:
Uses --active flag to ensure proper virtual environment detection.
"""
if not isinstance(proxy_options, list):
raise ValueError("proxy_options must be a list")
try:
command = ["uv", "sync"] + proxy_options
command = [UV_COMMAND, SYNC_COMMAND, ACTIVE_FLAG] + proxy_options
logger.debug(f"Executing command: {' '.join(command)}")
subprocess.run(command, check=True, capture_output=False, text=True)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while running the crew: {e}", err=True)
click.echo(e.output, err=True)
if e.output is not None:
click.echo(e.output, err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)

View File

@@ -6,12 +6,11 @@ import shutil
import uuid
from typing import Any, Dict, List, Optional
import numpy as np
from chromadb.api import ClientAPI
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH, MEMORY_CHUNK_SIZE, MEMORY_CHUNK_OVERLAP
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path
@@ -139,57 +138,15 @@ class RAGStorage(BaseRAGStorage):
logging.error(f"Error during {self.type} search: {str(e)}")
return []
def _chunk_text(self, text: str) -> List[str]:
"""
Split text into chunks to avoid token limits.
Args:
text: Input text to chunk.
Returns:
List[str]: A list of chunked text segments, adhering to defined size and overlap.
Empty list if input text is empty.
"""
if not text:
return []
if len(text) <= MEMORY_CHUNK_SIZE:
return [text]
chunks = []
start_indices = range(0, len(text), MEMORY_CHUNK_SIZE - MEMORY_CHUNK_OVERLAP)
for i in start_indices:
chunk = text[i:i + MEMORY_CHUNK_SIZE]
if chunk: # Only add non-empty chunks
chunks.append(chunk)
return chunks
def _generate_embedding(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> Optional[None]:
"""
Generate embeddings for text and add to collection.
Args:
text: Input text to generate embeddings for.
metadata: Optional metadata to associate with the embeddings.
Returns:
None if successful, None if text is empty.
"""
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> None: # type: ignore
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
chunks = self._chunk_text(text)
if not chunks:
return None
for chunk in chunks:
self.collection.add(
documents=[chunk],
metadatas=[metadata or {}],
ids=[str(uuid.uuid4())],
)
self.collection.add(
documents=[text],
metadatas=[metadata or {}],
ids=[str(uuid.uuid4())],
)
def reset(self) -> None:
try:

View File

@@ -4,5 +4,3 @@ DEFAULT_SCORE_THRESHOLD = 0.35
KNOWLEDGE_DIRECTORY = "knowledge"
MAX_LLM_RETRY = 3
MAX_FILE_NAME_LENGTH = 255
MEMORY_CHUNK_SIZE = 4000
MEMORY_CHUNK_OVERLAP = 200

View File

@@ -0,0 +1,79 @@
from unittest import mock
from typing import List, Any
import pytest
import subprocess
from crewai.cli.install_crew import install_crew, UV_COMMAND, SYNC_COMMAND, ACTIVE_FLAG
@pytest.mark.parametrize(
"proxy_options,expected_command",
[
([], [UV_COMMAND, SYNC_COMMAND, ACTIVE_FLAG]),
(
["--index-url", "https://custom-pypi.org/simple"],
[UV_COMMAND, SYNC_COMMAND, ACTIVE_FLAG, "--index-url", "https://custom-pypi.org/simple"],
),
],
)
@mock.patch("subprocess.run")
def test_install_crew_with_options(
mock_subprocess: mock.MagicMock, proxy_options: List[str], expected_command: List[str]
) -> None:
"""Test that install_crew correctly passes options to the command."""
install_crew(proxy_options)
mock_subprocess.assert_called_once_with(
expected_command, check=True, capture_output=False, text=True
)
@mock.patch("subprocess.run")
@mock.patch("click.echo")
def test_install_crew_with_subprocess_error(
mock_echo: mock.MagicMock, mock_subprocess: mock.MagicMock
) -> None:
"""Test that install_crew handles subprocess errors correctly."""
error = subprocess.CalledProcessError(1, f"{UV_COMMAND} {SYNC_COMMAND} {ACTIVE_FLAG}")
error.output = "Error output"
mock_subprocess.side_effect = error
install_crew([])
assert mock_echo.call_count == 2
mock_echo.assert_any_call(f"An error occurred while running the crew: {error}", err=True)
mock_echo.assert_any_call("Error output", err=True)
@mock.patch("subprocess.run")
@mock.patch("click.echo")
def test_install_crew_with_subprocess_error_empty_output(
mock_echo: mock.MagicMock, mock_subprocess: mock.MagicMock
) -> None:
"""Test that install_crew handles subprocess errors with empty output correctly."""
error = subprocess.CalledProcessError(1, f"{UV_COMMAND} {SYNC_COMMAND} {ACTIVE_FLAG}")
error.output = None
mock_subprocess.side_effect = error
install_crew([])
mock_echo.assert_called_once_with(f"An error occurred while running the crew: {error}", err=True)
@mock.patch("subprocess.run")
@mock.patch("click.echo")
def test_install_crew_with_generic_exception(
mock_echo: mock.MagicMock, mock_subprocess: mock.MagicMock
) -> None:
"""Test that install_crew handles generic exceptions correctly."""
error = Exception("Generic error")
mock_subprocess.side_effect = error
install_crew([])
mock_echo.assert_called_once_with(f"An unexpected error occurred: {error}", err=True)
def test_install_crew_with_invalid_proxy_options() -> None:
"""Test that install_crew validates the proxy_options parameter."""
with pytest.raises(ValueError, match="proxy_options must be a list"):
install_crew("not a list") # type: ignore

View File

@@ -1,86 +0,0 @@
import pytest
import numpy as np
from unittest.mock import patch, MagicMock
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
from crewai.utilities.constants import MEMORY_CHUNK_SIZE
@pytest.fixture
def short_term_memory():
"""Fixture to create a ShortTermMemory instance"""
agent = Agent(
role="Researcher",
goal="Search relevant data and provide results",
backstory="You are a researcher at a leading tech think tank.",
tools=[],
verbose=True,
)
task = Task(
description="Perform a search on specific topics.",
expected_output="A list of relevant URLs based on the search query.",
agent=agent,
)
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
def test_memory_with_large_input(short_term_memory):
"""Test that memory can handle large inputs without token limit errors"""
large_input = "test value " * (MEMORY_CHUNK_SIZE + 1000)
with patch.object(
short_term_memory.storage, '_chunk_text',
return_value=["chunk1", "chunk2"]
) as mock_chunk_text:
with patch.object(
short_term_memory.storage.collection, 'add'
) as mock_add:
short_term_memory.save(value=large_input, agent="test_agent")
assert mock_chunk_text.called
with patch.object(
short_term_memory.storage, 'search',
return_value=[{"context": large_input, "metadata": {"agent": "test_agent"}, "score": 0.95}]
):
result = short_term_memory.search(large_input[:100], score_threshold=0.01)
assert result[0]["context"] == large_input
assert result[0]["metadata"]["agent"] == "test_agent"
def test_memory_with_empty_input(short_term_memory):
"""Test that memory correctly handles empty input strings"""
empty_input = ""
with patch.object(
short_term_memory.storage, '_chunk_text',
return_value=[]
) as mock_chunk_text:
with patch.object(
short_term_memory.storage.collection, 'add'
) as mock_add:
short_term_memory.save(value=empty_input, agent="test_agent")
mock_chunk_text.assert_called_with(empty_input)
mock_add.assert_not_called()
def test_memory_with_exact_chunk_size_input(short_term_memory):
"""Test that memory correctly handles inputs that match chunk size exactly"""
exact_size_input = "x" * MEMORY_CHUNK_SIZE
with patch.object(
short_term_memory.storage, '_chunk_text',
return_value=[exact_size_input]
) as mock_chunk_text:
with patch.object(
short_term_memory.storage.collection, 'add'
) as mock_add:
short_term_memory.save(value=exact_size_input, agent="test_agent")
mock_chunk_text.assert_called_with(exact_size_input)
assert mock_add.call_count == 1