Compare commits

..

4 Commits

7 changed files with 19 additions and 225 deletions

View File

@@ -1,28 +1,15 @@
from typing import Any, Dict, Optional
import threading
from threading import local
from pydantic import BaseModel, PrivateAttr
_thread_local = local()
class CacheHandler(BaseModel):
"""Callback handler for tool usage."""
_cache: Dict[str, Any] = PrivateAttr(default_factory=dict)
def _get_lock(self):
"""Get a thread-local lock to avoid pickling issues."""
if not hasattr(_thread_local, "cache_lock"):
_thread_local.cache_lock = threading.Lock()
return _thread_local.cache_lock
def add(self, tool, input, output):
with self._get_lock():
self._cache[f"{tool}-{input}"] = output
self._cache[f"{tool}-{input}"] = output
def read(self, tool, input) -> Optional[str]:
with self._get_lock():
return self._cache.get(f"{tool}-{input}")
return self._cache.get(f"{tool}-{input}")

View File

@@ -88,7 +88,7 @@ class Crew(BaseModel):
_rpm_controller: RPMController = PrivateAttr()
_logger: Logger = PrivateAttr()
_file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()

View File

@@ -14,13 +14,13 @@ class Knowledge(BaseModel):
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
Args:
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder_config: Optional[Dict[str, Any]] = None
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder_config: Optional[Dict[str, Any]] = None
collection_name: Optional[str] = None

View File

@@ -22,7 +22,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
default_factory=list, description="The path to the file"
)
content: Dict[Path, str] = Field(init=False, default_factory=dict)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
@@ -62,7 +62,10 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
def _save_documents(self):
"""Save the documents to the storage."""
self.storage.save(self.chunks)
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""

View File

@@ -16,7 +16,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
chunk_embeddings: List[np.ndarray] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
metadata: Dict[str, Any] = Field(default_factory=dict) # Currently unused
collection_name: Optional[str] = Field(default=None)
@@ -46,4 +46,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
Save the documents to the storage.
This method should be called after the chunks and embeddings are generated.
"""
self.storage.save(self.chunks)
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")

View File

@@ -4,15 +4,11 @@ import asyncio
import json
import os
import platform
import threading
import warnings
from contextlib import contextmanager
from importlib.metadata import version
from threading import local
from typing import TYPE_CHECKING, Any, Optional
_thread_local = local()
@contextmanager
def suppress_warnings():
@@ -80,20 +76,12 @@ class Telemetry:
raise # Re-raise the exception to not interfere with system signals
self.ready = False
def _get_lock(self):
"""Get a thread-local lock to avoid pickling issues."""
if not hasattr(_thread_local, "telemetry_lock"):
_thread_local.telemetry_lock = threading.Lock()
return _thread_local.telemetry_lock
def set_tracer(self):
if self.ready and not self.trace_set:
try:
with self._get_lock():
if not self.trace_set: # Double-check to avoid race condition
with suppress_warnings():
trace.set_tracer_provider(self.provider)
self.trace_set = True
with suppress_warnings():
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception:
self.ready = False
self.trace_set = False
@@ -102,8 +90,7 @@ class Telemetry:
if not self.ready:
return
try:
with self._get_lock():
operation()
operation()
except Exception:
pass

View File

@@ -1,186 +0,0 @@
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import pytest
from unittest.mock import patch
from crewai import Agent, Crew, Task
class MockLLM:
"""Mock LLM for testing."""
def __init__(self, model="gpt-3.5-turbo", **kwargs):
self.model = model
self.stop = None
self.timeout = None
self.temperature = None
self.top_p = None
self.n = None
self.max_completion_tokens = None
self.max_tokens = None
self.presence_penalty = None
self.frequency_penalty = None
self.logit_bias = None
self.response_format = None
self.seed = None
self.logprobs = None
self.top_logprobs = None
self.base_url = None
self.api_version = None
self.api_key = None
self.callbacks = []
self.context_window_size = 8192
self.kwargs = {}
for key, value in kwargs.items():
setattr(self, key, value)
def complete(self, prompt, **kwargs):
"""Mock completion method."""
return f"Mock response for: {prompt[:20]}..."
def chat_completion(self, messages, **kwargs):
"""Mock chat completion method."""
return {"choices": [{"message": {"content": "Mock response"}}]}
def function_call(self, messages, functions, **kwargs):
"""Mock function call method."""
return {
"choices": [
{
"message": {
"content": "Mock response",
"function_call": {
"name": "test_function",
"arguments": '{"arg1": "value1"}'
}
}
}
]
}
def supports_stop_words(self):
"""Mock supports_stop_words method."""
return False
def supports_function_calling(self):
"""Mock supports_function_calling method."""
return True
def get_context_window_size(self):
"""Mock get_context_window_size method."""
return self.context_window_size
def call(self, messages, callbacks=None):
"""Mock call method."""
return "Mock response from call method"
def set_callbacks(self, callbacks):
"""Mock set_callbacks method."""
self.callbacks = callbacks
def set_env_callbacks(self):
"""Mock set_env_callbacks method."""
pass
def create_test_crew():
"""Create a simple test crew for concurrency testing."""
with patch("crewai.agent.LLM", MockLLM):
agent = Agent(
role="Test Agent",
goal="Test concurrent execution",
backstory="I am a test agent for concurrent execution",
)
task = Task(
description="Test task for concurrent execution",
expected_output="Test output",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=False,
)
return crew
def test_threading_concurrency():
"""Test concurrent execution using ThreadPoolExecutor."""
num_threads = 5
results = []
def generate_response(idx):
try:
crew = create_test_crew()
with patch("crewai.agent.LLM", MockLLM):
output = crew.kickoff(inputs={"test_input": f"input_{idx}"})
return output
except Exception as e:
pytest.fail(f"Exception in thread {idx}: {e}")
return None
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(generate_response, i) for i in range(num_threads)]
for future in as_completed(futures):
result = future.result()
assert result is not None
results.append(result)
assert len(results) == num_threads
@pytest.mark.asyncio
async def test_asyncio_concurrency():
"""Test concurrent execution using asyncio."""
num_tasks = 5
sem = asyncio.Semaphore(num_tasks)
async def generate_response_async(idx):
async with sem:
try:
crew = create_test_crew()
with patch("crewai.agent.LLM", MockLLM):
output = await crew.kickoff_async(inputs={"test_input": f"input_{idx}"})
return output
except Exception as e:
pytest.fail(f"Exception in task {idx}: {e}")
return None
tasks = [generate_response_async(i) for i in range(num_tasks)]
results = await asyncio.gather(*tasks)
assert len(results) == num_tasks
assert all(result is not None for result in results)
@pytest.mark.asyncio
async def test_extended_asyncio_concurrency():
"""Extended test for asyncio concurrency with more iterations."""
num_tasks = 5 # Reduced from 10 for faster testing
iterations = 2 # Reduced from 3 for faster testing
sem = asyncio.Semaphore(num_tasks)
async def generate_response_async(idx):
async with sem:
crew = create_test_crew()
for i in range(iterations):
try:
with patch("crewai.agent.LLM", MockLLM):
output = await crew.kickoff_async(
inputs={"test_input": f"input_{idx}_{i}"}
)
assert output is not None
except Exception as e:
pytest.fail(f"Exception in task {idx}, iteration {i}: {e}")
return False
return True
tasks = [generate_response_async(i) for i in range(num_tasks)]
results = await asyncio.gather(*tasks)
assert all(results)