mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-21 22:08:21 +00:00
Compare commits
7 Commits
devin/1768
...
devin/1739
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db34516b18 | ||
|
|
6c3f4a6446 | ||
|
|
3cf2f982dd | ||
|
|
48604f567b | ||
|
|
f724ed93b9 | ||
|
|
abc71be5f6 | ||
|
|
420f826e56 |
3
pytest.ini
Normal file
3
pytest.ini
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
[pytest]
|
||||||
|
markers =
|
||||||
|
agentops: Tests for AgentOps integration
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
import re
|
import re
|
||||||
import uuid
|
import uuid
|
||||||
import warnings
|
import warnings
|
||||||
@@ -54,8 +55,11 @@ from crewai.utilities.training_handler import CrewTrainingHandler
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
import agentops # type: ignore
|
import agentops # type: ignore
|
||||||
|
from agentops.exceptions import AgentOpsError, AuthenticationError # type: ignore
|
||||||
except ImportError:
|
except ImportError:
|
||||||
agentops = None
|
agentops = None
|
||||||
|
AgentOpsError = None
|
||||||
|
AuthenticationError = None
|
||||||
|
|
||||||
|
|
||||||
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
|
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
|
||||||
@@ -90,6 +94,8 @@ class Crew(BaseModel):
|
|||||||
__hash__ = object.__hash__ # type: ignore
|
__hash__ = object.__hash__ # type: ignore
|
||||||
_execution_span: Any = PrivateAttr()
|
_execution_span: Any = PrivateAttr()
|
||||||
_rpm_controller: RPMController = PrivateAttr()
|
_rpm_controller: RPMController = PrivateAttr()
|
||||||
|
_agentops: Optional['agentops.AgentOps'] = PrivateAttr(default=None)
|
||||||
|
_telemetry: Optional[Telemetry] = PrivateAttr(default=None)
|
||||||
_logger: Logger = PrivateAttr()
|
_logger: Logger = PrivateAttr()
|
||||||
_file_handler: FileHandler = PrivateAttr()
|
_file_handler: FileHandler = PrivateAttr()
|
||||||
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
|
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
|
||||||
@@ -240,19 +246,72 @@ class Crew(BaseModel):
|
|||||||
# TODO: Improve typing
|
# TODO: Improve typing
|
||||||
return json.loads(v) if isinstance(v, Json) else v # type: ignore
|
return json.loads(v) if isinstance(v, Json) else v # type: ignore
|
||||||
|
|
||||||
@model_validator(mode="after")
|
def _validate_api_key(self, api_key: Optional[str]) -> bool:
|
||||||
|
"""Validate the AgentOps API key.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
api_key: The API key to validate
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the API key is valid, False otherwise
|
||||||
|
"""
|
||||||
|
if not api_key:
|
||||||
|
return False
|
||||||
|
stripped_key = api_key.strip()
|
||||||
|
return bool(stripped_key and len(stripped_key) > 10)
|
||||||
|
|
||||||
def set_private_attrs(self) -> "Crew":
|
def set_private_attrs(self) -> "Crew":
|
||||||
"""Set private attributes."""
|
"""Initialize private attributes including AgentOps integration.
|
||||||
|
|
||||||
|
This method sets up:
|
||||||
|
- Logger and file handler for output logging
|
||||||
|
- RPM controller for rate limiting
|
||||||
|
- AgentOps integration for monitoring (if available and configured)
|
||||||
|
"""
|
||||||
self._cache_handler = CacheHandler()
|
self._cache_handler = CacheHandler()
|
||||||
self._logger = Logger(verbose=self.verbose)
|
self._logger = Logger(verbose=self.verbose)
|
||||||
if self.output_log_file:
|
if self.output_log_file:
|
||||||
self._file_handler = FileHandler(self.output_log_file)
|
self._file_handler = FileHandler(self.output_log_file)
|
||||||
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
|
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
|
||||||
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
|
|
||||||
self.function_calling_llm = create_llm(self.function_calling_llm)
|
|
||||||
|
|
||||||
self._telemetry = Telemetry()
|
self._telemetry = Telemetry()
|
||||||
self._telemetry.set_tracer()
|
self._telemetry.set_tracer()
|
||||||
|
|
||||||
|
# Initialize agentops if available and API key is present
|
||||||
|
if agentops:
|
||||||
|
api_key = os.getenv("AGENTOPS_API_KEY")
|
||||||
|
if self._validate_api_key(api_key):
|
||||||
|
try:
|
||||||
|
agentops.init(api_key)
|
||||||
|
self._agentops = agentops
|
||||||
|
self._logger.log(
|
||||||
|
"info",
|
||||||
|
"Successfully initialized agentops",
|
||||||
|
color="green"
|
||||||
|
)
|
||||||
|
except (ConnectionError, AuthenticationError) as e:
|
||||||
|
self._logger.log(
|
||||||
|
"warning",
|
||||||
|
f"Failed to connect to agentops: {e}",
|
||||||
|
color="yellow"
|
||||||
|
)
|
||||||
|
self._agentops = None
|
||||||
|
except (ValueError, AgentOpsError) as e:
|
||||||
|
self._logger.log(
|
||||||
|
"warning",
|
||||||
|
f"Invalid agentops configuration: {e}",
|
||||||
|
color="yellow"
|
||||||
|
)
|
||||||
|
self._agentops = None
|
||||||
|
else:
|
||||||
|
self._logger.log(
|
||||||
|
"warning",
|
||||||
|
"Invalid AGENTOPS_API_KEY provided",
|
||||||
|
color="yellow"
|
||||||
|
)
|
||||||
|
self._agentops = None
|
||||||
|
|
||||||
|
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
|
||||||
|
self.function_calling_llm = create_llm(self.function_calling_llm)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@model_validator(mode="after")
|
@model_validator(mode="after")
|
||||||
@@ -543,7 +602,8 @@ class Crew(BaseModel):
|
|||||||
inputs = before_callback(inputs)
|
inputs = before_callback(inputs)
|
||||||
|
|
||||||
"""Starts the crew to work on its assigned tasks."""
|
"""Starts the crew to work on its assigned tasks."""
|
||||||
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
|
if self._telemetry:
|
||||||
|
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
|
||||||
self._task_output_handler.reset()
|
self._task_output_handler.reset()
|
||||||
self._logging_color = "bold_purple"
|
self._logging_color = "bold_purple"
|
||||||
|
|
||||||
@@ -1121,16 +1181,22 @@ class Crew(BaseModel):
|
|||||||
for agent in self.agents:
|
for agent in self.agents:
|
||||||
agent.interpolate_inputs(inputs)
|
agent.interpolate_inputs(inputs)
|
||||||
|
|
||||||
def _finish_execution(self, final_string_output: str) -> None:
|
def _finish_execution(self, final_output: Union[str, CrewOutput]) -> None:
|
||||||
|
"""Finish execution and cleanup.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
final_output: The final output from crew execution, either as string or CrewOutput
|
||||||
|
"""
|
||||||
if self.max_rpm:
|
if self.max_rpm:
|
||||||
self._rpm_controller.stop_rpm_counter()
|
self._rpm_controller.stop_rpm_counter()
|
||||||
if agentops:
|
if self._telemetry:
|
||||||
agentops.end_session(
|
self._telemetry.end_crew(self, final_output)
|
||||||
|
if self._agentops:
|
||||||
|
self._agentops.end_session(
|
||||||
end_state="Success",
|
end_state="Success",
|
||||||
end_state_reason="Finished Execution",
|
end_state_reason="Finished Execution",
|
||||||
is_auto_end=True,
|
is_auto_end=True,
|
||||||
)
|
)
|
||||||
self._telemetry.end_crew(self, final_string_output)
|
|
||||||
|
|
||||||
def calculate_usage_metrics(self) -> UsageMetrics:
|
def calculate_usage_metrics(self) -> UsageMetrics:
|
||||||
"""Calculates and returns the usage metrics."""
|
"""Calculates and returns the usage metrics."""
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ from concurrent.futures import Future
|
|||||||
from unittest import mock
|
from unittest import mock
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import agentops
|
||||||
import instructor
|
import instructor
|
||||||
import pydantic_core
|
import pydantic_core
|
||||||
import pytest
|
import pytest
|
||||||
@@ -15,6 +16,7 @@ from crewai.agents.cache import CacheHandler
|
|||||||
from crewai.crew import Crew
|
from crewai.crew import Crew
|
||||||
from crewai.crews.crew_output import CrewOutput
|
from crewai.crews.crew_output import CrewOutput
|
||||||
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
|
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
|
||||||
|
from crewai.llm import LLM
|
||||||
from crewai.memory.contextual.contextual_memory import ContextualMemory
|
from crewai.memory.contextual.contextual_memory import ContextualMemory
|
||||||
from crewai.process import Process
|
from crewai.process import Process
|
||||||
from crewai.project import crew
|
from crewai.project import crew
|
||||||
@@ -27,6 +29,100 @@ from crewai.utilities import Logger
|
|||||||
from crewai.utilities.rpm_controller import RPMController
|
from crewai.utilities.rpm_controller import RPMController
|
||||||
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
|
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def researcher():
|
||||||
|
"""Fixture to create a researcher agent."""
|
||||||
|
return Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Make the best research and analysis on content about AI and AI agents",
|
||||||
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups.",
|
||||||
|
allow_delegation=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_agentops():
|
||||||
|
"""Fixture to mock agentops for testing."""
|
||||||
|
mock_agentops = MagicMock()
|
||||||
|
mock_agentops.init = MagicMock()
|
||||||
|
return mock_agentops
|
||||||
|
|
||||||
|
@pytest.mark.agentops
|
||||||
|
class TestAgentOpsIntegration:
|
||||||
|
"""Tests for AgentOps integration."""
|
||||||
|
|
||||||
|
def test_initialization_with_api_key(self, mock_agentops, monkeypatch):
|
||||||
|
"""Test that agentops is properly initialized when API key is present."""
|
||||||
|
monkeypatch.setattr("crewai.crew.agentops", mock_agentops)
|
||||||
|
monkeypatch.setenv("AGENTOPS_API_KEY", "test-key-12345")
|
||||||
|
crew = Crew(agents=[researcher], tasks=[Task(
|
||||||
|
description="Test task",
|
||||||
|
expected_output="Test output",
|
||||||
|
agent=researcher,
|
||||||
|
)])
|
||||||
|
crew.set_private_attrs()
|
||||||
|
mock_agentops.init.assert_called_once_with("test-key-12345")
|
||||||
|
|
||||||
|
def test_initialization_without_api_key(self, mock_agentops):
|
||||||
|
"""Test that agentops is not initialized when API key is not present."""
|
||||||
|
crew = Crew(agents=[researcher], tasks=[Task(
|
||||||
|
description="Test task",
|
||||||
|
expected_output="Test output",
|
||||||
|
agent=researcher,
|
||||||
|
)])
|
||||||
|
mock_agentops.assert_not_called()
|
||||||
|
|
||||||
|
def test_initialization_with_invalid_api_key(self, mock_agentops, monkeypatch):
|
||||||
|
"""Test that agentops is not initialized when API key is invalid."""
|
||||||
|
monkeypatch.setenv("AGENTOPS_API_KEY", " ")
|
||||||
|
crew = Crew(agents=[researcher], tasks=[Task(
|
||||||
|
description="Test task",
|
||||||
|
expected_output="Test output",
|
||||||
|
agent=researcher,
|
||||||
|
)])
|
||||||
|
mock_agentops.assert_not_called()
|
||||||
|
|
||||||
|
def test_gemini_llm_integration(self, mock_agentops, monkeypatch):
|
||||||
|
"""Test that Gemini LLM works correctly with agentops."""
|
||||||
|
# Mock agentops
|
||||||
|
monkeypatch.setattr("crewai.crew.agentops", mock_agentops)
|
||||||
|
|
||||||
|
# Set API keys
|
||||||
|
monkeypatch.setenv("AGENTOPS_API_KEY", "test-key-12345")
|
||||||
|
monkeypatch.setenv("GOOGLE_API_KEY", "test-key")
|
||||||
|
|
||||||
|
# Create crew with Gemini LLM
|
||||||
|
llm = LLM(model="gemini-pro")
|
||||||
|
agent = Agent(
|
||||||
|
role="test",
|
||||||
|
goal="test",
|
||||||
|
backstory="test",
|
||||||
|
llm=llm
|
||||||
|
)
|
||||||
|
task = Task(
|
||||||
|
description="test task",
|
||||||
|
expected_output="test output",
|
||||||
|
agent=agent
|
||||||
|
)
|
||||||
|
crew = Crew(agents=[agent], tasks=[task])
|
||||||
|
crew.set_private_attrs()
|
||||||
|
|
||||||
|
# Mock the agent execution to avoid actual API calls
|
||||||
|
with patch.object(Task, 'execute_sync', return_value=TaskOutput(
|
||||||
|
description="test",
|
||||||
|
raw="test output",
|
||||||
|
agent=agent.role
|
||||||
|
)):
|
||||||
|
# Run crew
|
||||||
|
crew.kickoff()
|
||||||
|
|
||||||
|
# Verify agentops.end_session was called correctly
|
||||||
|
mock_agentops.end_session.assert_called_once_with(
|
||||||
|
end_state="Success",
|
||||||
|
end_state_reason="Finished Execution",
|
||||||
|
is_auto_end=True
|
||||||
|
)
|
||||||
|
|
||||||
ceo = Agent(
|
ceo = Agent(
|
||||||
role="CEO",
|
role="CEO",
|
||||||
goal="Make sure the writers in your company produce amazing content.",
|
goal="Make sure the writers in your company produce amazing content.",
|
||||||
|
|||||||
Reference in New Issue
Block a user