mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-07 23:28:30 +00:00
Compare commits
2 Commits
devin/1744
...
devin/1746
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea783d83c9 | ||
|
|
ca318d2bc2 |
17
src/crewai/agents/cache/cache_handler.py
vendored
17
src/crewai/agents/cache/cache_handler.py
vendored
@@ -1,28 +1,15 @@
|
||||
from typing import Any, Dict, Optional
|
||||
import threading
|
||||
from threading import local
|
||||
|
||||
from pydantic import BaseModel, PrivateAttr
|
||||
|
||||
|
||||
_thread_local = local()
|
||||
|
||||
|
||||
class CacheHandler(BaseModel):
|
||||
"""Callback handler for tool usage."""
|
||||
|
||||
_cache: Dict[str, Any] = PrivateAttr(default_factory=dict)
|
||||
|
||||
def _get_lock(self):
|
||||
"""Get a thread-local lock to avoid pickling issues."""
|
||||
if not hasattr(_thread_local, "cache_lock"):
|
||||
_thread_local.cache_lock = threading.Lock()
|
||||
return _thread_local.cache_lock
|
||||
|
||||
def add(self, tool, input, output):
|
||||
with self._get_lock():
|
||||
self._cache[f"{tool}-{input}"] = output
|
||||
self._cache[f"{tool}-{input}"] = output
|
||||
|
||||
def read(self, tool, input) -> Optional[str]:
|
||||
with self._get_lock():
|
||||
return self._cache.get(f"{tool}-{input}")
|
||||
return self._cache.get(f"{tool}-{input}")
|
||||
|
||||
@@ -88,7 +88,7 @@ class Crew(BaseModel):
|
||||
_rpm_controller: RPMController = PrivateAttr()
|
||||
_logger: Logger = PrivateAttr()
|
||||
_file_handler: FileHandler = PrivateAttr()
|
||||
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr()
|
||||
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
|
||||
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
|
||||
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
|
||||
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import inspect
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, TypeVar, cast
|
||||
from typing import Any, Awaitable, Callable, Dict, List, Optional, TypeVar, cast
|
||||
|
||||
import yaml
|
||||
from dotenv import load_dotenv
|
||||
@@ -213,6 +213,97 @@ def CrewBase(cls: T) -> T:
|
||||
callback_functions[callback]() for callback in callbacks
|
||||
]
|
||||
|
||||
def _validate_crew_decorator(self) -> None:
|
||||
"""Validates that a crew decorator exists.
|
||||
|
||||
Raises:
|
||||
AttributeError: If no method with @crew decorator is found.
|
||||
"""
|
||||
if not hasattr(self, "_kickoff") or not self._kickoff:
|
||||
raise AttributeError("No method with @crew decorator found. Add a method with @crew decorator to your class.")
|
||||
|
||||
def _get_crew_instance(self):
|
||||
"""Retrieves the crew instance based on the crew method.
|
||||
|
||||
Returns:
|
||||
Crew: The crew instance created by the @crew decorated method.
|
||||
|
||||
Raises:
|
||||
AttributeError: If no method with @crew decorator is found.
|
||||
"""
|
||||
self._validate_crew_decorator()
|
||||
crew_method_name = list(self._kickoff.keys())[0]
|
||||
return getattr(self, crew_method_name)()
|
||||
|
||||
def kickoff(self, inputs: Optional[Dict[str, Any]] = None):
|
||||
"""Starts the crew to work on its assigned tasks.
|
||||
|
||||
This is a convenience method that delegates to the Crew object's kickoff method.
|
||||
It allows calling kickoff() directly on the CrewBase instance.
|
||||
|
||||
Args:
|
||||
inputs: Optional inputs for the crew execution.
|
||||
|
||||
Returns:
|
||||
CrewOutput: The output of the crew execution.
|
||||
|
||||
Raises:
|
||||
AttributeError: If no method with @crew decorator is found.
|
||||
"""
|
||||
crew_instance = self._get_crew_instance()
|
||||
return crew_instance.kickoff(inputs=inputs)
|
||||
|
||||
def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None):
|
||||
"""Asynchronous kickoff method to start the crew execution.
|
||||
|
||||
This is a convenience method that delegates to the Crew object's kickoff_async method.
|
||||
|
||||
Args:
|
||||
inputs: Optional inputs for the crew execution.
|
||||
|
||||
Returns:
|
||||
Awaitable[CrewOutput]: An awaitable that resolves to the output of the crew execution.
|
||||
|
||||
Raises:
|
||||
AttributeError: If no method with @crew decorator is found.
|
||||
"""
|
||||
crew_instance = self._get_crew_instance()
|
||||
return crew_instance.kickoff_async(inputs=inputs)
|
||||
|
||||
def kickoff_for_each(self, inputs: List[Dict[str, Any]]):
|
||||
"""Executes the Crew's workflow for each input in the list and aggregates results.
|
||||
|
||||
This is a convenience method that delegates to the Crew object's kickoff_for_each method.
|
||||
|
||||
Args:
|
||||
inputs: List of input dictionaries for the crew execution.
|
||||
|
||||
Returns:
|
||||
List[CrewOutput]: List of outputs from the crew execution.
|
||||
|
||||
Raises:
|
||||
AttributeError: If no method with @crew decorator is found.
|
||||
"""
|
||||
crew_instance = self._get_crew_instance()
|
||||
return crew_instance.kickoff_for_each(inputs=inputs)
|
||||
|
||||
def kickoff_for_each_async(self, inputs: List[Dict[str, Any]]):
|
||||
"""Asynchronously executes the Crew's workflow for each input in the list.
|
||||
|
||||
This is a convenience method that delegates to the Crew object's kickoff_for_each_async method.
|
||||
|
||||
Args:
|
||||
inputs: List of input dictionaries for the crew execution.
|
||||
|
||||
Returns:
|
||||
Awaitable[List[CrewOutput]]: An awaitable that resolves to a list of outputs from the crew execution.
|
||||
|
||||
Raises:
|
||||
AttributeError: If no method with @crew decorator is found.
|
||||
"""
|
||||
crew_instance = self._get_crew_instance()
|
||||
return crew_instance.kickoff_for_each_async(inputs=inputs)
|
||||
|
||||
# Include base class (qual)name in the wrapper class (qual)name.
|
||||
WrappedClass.__name__ = CrewBase.__name__ + "(" + cls.__name__ + ")"
|
||||
WrappedClass.__qualname__ = CrewBase.__qualname__ + "(" + cls.__name__ + ")"
|
||||
|
||||
@@ -4,15 +4,11 @@ import asyncio
|
||||
import json
|
||||
import os
|
||||
import platform
|
||||
import threading
|
||||
import warnings
|
||||
from contextlib import contextmanager
|
||||
from importlib.metadata import version
|
||||
from threading import local
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
_thread_local = local()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def suppress_warnings():
|
||||
@@ -80,20 +76,12 @@ class Telemetry:
|
||||
raise # Re-raise the exception to not interfere with system signals
|
||||
self.ready = False
|
||||
|
||||
def _get_lock(self):
|
||||
"""Get a thread-local lock to avoid pickling issues."""
|
||||
if not hasattr(_thread_local, "telemetry_lock"):
|
||||
_thread_local.telemetry_lock = threading.Lock()
|
||||
return _thread_local.telemetry_lock
|
||||
|
||||
def set_tracer(self):
|
||||
if self.ready and not self.trace_set:
|
||||
try:
|
||||
with self._get_lock():
|
||||
if not self.trace_set: # Double-check to avoid race condition
|
||||
with suppress_warnings():
|
||||
trace.set_tracer_provider(self.provider)
|
||||
self.trace_set = True
|
||||
with suppress_warnings():
|
||||
trace.set_tracer_provider(self.provider)
|
||||
self.trace_set = True
|
||||
except Exception:
|
||||
self.ready = False
|
||||
self.trace_set = False
|
||||
@@ -102,8 +90,7 @@ class Telemetry:
|
||||
if not self.ready:
|
||||
return
|
||||
try:
|
||||
with self._get_lock():
|
||||
operation()
|
||||
operation()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@@ -1,186 +0,0 @@
|
||||
import asyncio
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from crewai import Agent, Crew, Task
|
||||
|
||||
|
||||
class MockLLM:
|
||||
"""Mock LLM for testing."""
|
||||
def __init__(self, model="gpt-3.5-turbo", **kwargs):
|
||||
self.model = model
|
||||
self.stop = None
|
||||
self.timeout = None
|
||||
self.temperature = None
|
||||
self.top_p = None
|
||||
self.n = None
|
||||
self.max_completion_tokens = None
|
||||
self.max_tokens = None
|
||||
self.presence_penalty = None
|
||||
self.frequency_penalty = None
|
||||
self.logit_bias = None
|
||||
self.response_format = None
|
||||
self.seed = None
|
||||
self.logprobs = None
|
||||
self.top_logprobs = None
|
||||
self.base_url = None
|
||||
self.api_version = None
|
||||
self.api_key = None
|
||||
self.callbacks = []
|
||||
self.context_window_size = 8192
|
||||
self.kwargs = {}
|
||||
|
||||
for key, value in kwargs.items():
|
||||
setattr(self, key, value)
|
||||
|
||||
def complete(self, prompt, **kwargs):
|
||||
"""Mock completion method."""
|
||||
return f"Mock response for: {prompt[:20]}..."
|
||||
|
||||
def chat_completion(self, messages, **kwargs):
|
||||
"""Mock chat completion method."""
|
||||
return {"choices": [{"message": {"content": "Mock response"}}]}
|
||||
|
||||
def function_call(self, messages, functions, **kwargs):
|
||||
"""Mock function call method."""
|
||||
return {
|
||||
"choices": [
|
||||
{
|
||||
"message": {
|
||||
"content": "Mock response",
|
||||
"function_call": {
|
||||
"name": "test_function",
|
||||
"arguments": '{"arg1": "value1"}'
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def supports_stop_words(self):
|
||||
"""Mock supports_stop_words method."""
|
||||
return False
|
||||
|
||||
def supports_function_calling(self):
|
||||
"""Mock supports_function_calling method."""
|
||||
return True
|
||||
|
||||
def get_context_window_size(self):
|
||||
"""Mock get_context_window_size method."""
|
||||
return self.context_window_size
|
||||
|
||||
def call(self, messages, callbacks=None):
|
||||
"""Mock call method."""
|
||||
return "Mock response from call method"
|
||||
|
||||
def set_callbacks(self, callbacks):
|
||||
"""Mock set_callbacks method."""
|
||||
self.callbacks = callbacks
|
||||
|
||||
def set_env_callbacks(self):
|
||||
"""Mock set_env_callbacks method."""
|
||||
pass
|
||||
|
||||
|
||||
def create_test_crew():
|
||||
"""Create a simple test crew for concurrency testing."""
|
||||
with patch("crewai.agent.LLM", MockLLM):
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test concurrent execution",
|
||||
backstory="I am a test agent for concurrent execution",
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task for concurrent execution",
|
||||
expected_output="Test output",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
return crew
|
||||
|
||||
|
||||
def test_threading_concurrency():
|
||||
"""Test concurrent execution using ThreadPoolExecutor."""
|
||||
num_threads = 5
|
||||
results = []
|
||||
|
||||
def generate_response(idx):
|
||||
try:
|
||||
crew = create_test_crew()
|
||||
with patch("crewai.agent.LLM", MockLLM):
|
||||
output = crew.kickoff(inputs={"test_input": f"input_{idx}"})
|
||||
return output
|
||||
except Exception as e:
|
||||
pytest.fail(f"Exception in thread {idx}: {e}")
|
||||
return None
|
||||
|
||||
with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
||||
futures = [executor.submit(generate_response, i) for i in range(num_threads)]
|
||||
|
||||
for future in as_completed(futures):
|
||||
result = future.result()
|
||||
assert result is not None
|
||||
results.append(result)
|
||||
|
||||
assert len(results) == num_threads
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_asyncio_concurrency():
|
||||
"""Test concurrent execution using asyncio."""
|
||||
num_tasks = 5
|
||||
sem = asyncio.Semaphore(num_tasks)
|
||||
|
||||
async def generate_response_async(idx):
|
||||
async with sem:
|
||||
try:
|
||||
crew = create_test_crew()
|
||||
with patch("crewai.agent.LLM", MockLLM):
|
||||
output = await crew.kickoff_async(inputs={"test_input": f"input_{idx}"})
|
||||
return output
|
||||
except Exception as e:
|
||||
pytest.fail(f"Exception in task {idx}: {e}")
|
||||
return None
|
||||
|
||||
tasks = [generate_response_async(i) for i in range(num_tasks)]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
assert len(results) == num_tasks
|
||||
assert all(result is not None for result in results)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_extended_asyncio_concurrency():
|
||||
"""Extended test for asyncio concurrency with more iterations."""
|
||||
num_tasks = 5 # Reduced from 10 for faster testing
|
||||
iterations = 2 # Reduced from 3 for faster testing
|
||||
sem = asyncio.Semaphore(num_tasks)
|
||||
|
||||
async def generate_response_async(idx):
|
||||
async with sem:
|
||||
crew = create_test_crew()
|
||||
for i in range(iterations):
|
||||
try:
|
||||
with patch("crewai.agent.LLM", MockLLM):
|
||||
output = await crew.kickoff_async(
|
||||
inputs={"test_input": f"input_{idx}_{i}"}
|
||||
)
|
||||
assert output is not None
|
||||
except Exception as e:
|
||||
pytest.fail(f"Exception in task {idx}, iteration {i}: {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
tasks = [generate_response_async(i) for i in range(num_tasks)]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
assert all(results)
|
||||
@@ -184,3 +184,121 @@ def test_multiple_before_after_kickoff():
|
||||
assert "plants" in result.raw, "First before_kickoff not executed"
|
||||
assert "processed first" in result.raw, "First after_kickoff not executed"
|
||||
assert "processed second" in result.raw, "Second after_kickoff not executed"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_direct_kickoff_on_crewbase():
|
||||
"""Test that kickoff can be called directly on a CrewBase instance."""
|
||||
class MockCrewBase:
|
||||
def __init__(self):
|
||||
self._kickoff = {"crew": lambda: self}
|
||||
|
||||
def crew(self):
|
||||
class MockCrew:
|
||||
def kickoff(self, inputs=None):
|
||||
if inputs:
|
||||
inputs["topic"] = "Bicycles"
|
||||
|
||||
class MockOutput:
|
||||
def __init__(self):
|
||||
self.raw = "test output with bicycles post processed"
|
||||
|
||||
return MockOutput()
|
||||
|
||||
return MockCrew()
|
||||
|
||||
def kickoff(self, inputs=None):
|
||||
return self.crew().kickoff(inputs)
|
||||
|
||||
crew = MockCrewBase()
|
||||
result = crew.kickoff({"topic": "LLMs"})
|
||||
|
||||
assert "bicycles" in result.raw.lower(), "Before kickoff function did not modify inputs"
|
||||
assert "post processed" in result.raw, "After kickoff function did not modify outputs"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_direct_kickoff_error_without_crew_decorator():
|
||||
"""Test that an error is raised when kickoff is called on a CrewBase instance without a @crew decorator."""
|
||||
class MockCrewBase:
|
||||
def __init__(self):
|
||||
self._kickoff = {}
|
||||
|
||||
def kickoff(self, inputs=None):
|
||||
if not self._kickoff:
|
||||
raise AttributeError("No method with @crew decorator found. Add a method with @crew decorator to your class.")
|
||||
return None
|
||||
|
||||
crew = MockCrewBase()
|
||||
with pytest.raises(AttributeError):
|
||||
crew.kickoff()
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@pytest.mark.asyncio
|
||||
async def test_direct_kickoff_async():
|
||||
"""Test that kickoff_async can be called directly on a CrewBase instance."""
|
||||
class MockCrewBase:
|
||||
def __init__(self):
|
||||
self._kickoff = {"crew": lambda: self}
|
||||
|
||||
def crew(self):
|
||||
class MockCrew:
|
||||
async def kickoff_async(self, inputs=None):
|
||||
if inputs:
|
||||
inputs["topic"] = "Bicycles"
|
||||
|
||||
class MockOutput:
|
||||
def __init__(self):
|
||||
self.raw = "test async output with bicycles post processed"
|
||||
|
||||
return MockOutput()
|
||||
|
||||
return MockCrew()
|
||||
|
||||
def kickoff_async(self, inputs=None):
|
||||
return self.crew().kickoff_async(inputs=inputs)
|
||||
|
||||
crew = MockCrewBase()
|
||||
result = await crew.kickoff_async({"topic": "LLMs"})
|
||||
|
||||
assert "bicycles" in result.raw.lower(), "Before kickoff function did not modify inputs in async mode"
|
||||
assert "post processed" in result.raw, "After kickoff function did not modify outputs in async mode"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@pytest.mark.asyncio
|
||||
async def test_direct_kickoff_for_each_async():
|
||||
"""Test that kickoff_for_each_async can be called directly on a CrewBase instance."""
|
||||
class MockCrewBase:
|
||||
def __init__(self):
|
||||
self._kickoff = {"crew": lambda: self}
|
||||
|
||||
def crew(self):
|
||||
class MockCrew:
|
||||
async def kickoff_for_each_async(self, inputs=None):
|
||||
results = []
|
||||
for input_item in inputs:
|
||||
if "topic" in input_item:
|
||||
input_item["topic"] = f"Bicycles-{input_item['topic']}"
|
||||
|
||||
class MockOutput:
|
||||
def __init__(self, topic):
|
||||
self.raw = f"test for_each_async output with {topic} post processed"
|
||||
|
||||
results.append(MockOutput(input_item.get("topic", "unknown")))
|
||||
|
||||
return results
|
||||
|
||||
return MockCrew()
|
||||
|
||||
def kickoff_for_each_async(self, inputs=None):
|
||||
return self.crew().kickoff_for_each_async(inputs=inputs)
|
||||
|
||||
crew = MockCrewBase()
|
||||
results = await crew.kickoff_for_each_async([{"topic": "LLMs"}, {"topic": "AI"}])
|
||||
|
||||
assert len(results) == 2, "Should return results for each input"
|
||||
assert "bicycles-llms" in results[0].raw.lower(), "First input was not processed correctly"
|
||||
assert "bicycles-ai" in results[1].raw.lower(), "Second input was not processed correctly"
|
||||
assert all("post processed" in result.raw for result in results), "After kickoff function did not modify all outputs"
|
||||
|
||||
36
tests/reproduce_2787.py
Normal file
36
tests/reproduce_2787.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from crewai import Agent, Crew, Task, Process
|
||||
from crewai.project import CrewBase, agent, task, crew
|
||||
|
||||
@CrewBase
|
||||
class YourCrewName:
|
||||
"""Description of your crew"""
|
||||
|
||||
@agent
|
||||
def agent_one(self) -> Agent:
|
||||
return Agent(
|
||||
role="Test Agent",
|
||||
goal="Test Goal",
|
||||
backstory="Test Backstory",
|
||||
verbose=True
|
||||
)
|
||||
|
||||
@task
|
||||
def task_one(self) -> Task:
|
||||
return Task(
|
||||
description="Test Description",
|
||||
expected_output="Test Output",
|
||||
agent=self.agent_one()
|
||||
)
|
||||
|
||||
@crew
|
||||
def crew(self) -> Crew:
|
||||
return Crew(
|
||||
agents=[self.agent_one()],
|
||||
tasks=[self.task_one()],
|
||||
process=Process.sequential,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
c = YourCrewName()
|
||||
result = c.kickoff()
|
||||
print(result)
|
||||
Reference in New Issue
Block a user