Compare commits

...

7 Commits

Author SHA1 Message Date
Devin AI
db34516b18 fix: Fix type-checker issues with _finish_execution method
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:46:27 +00:00
Devin AI
6c3f4a6446 refactor: Improve agentops integration with better types and tests
- Add specific error handling for agentops initialization
- Add type hints for better code readability
- Improve API key validation
- Reorganize tests using pytest fixtures and classes
- Add documentation for set_private_attrs method

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:43:26 +00:00
Devin AI
3cf2f982dd fix: Add pytest.ini and improve test initialization
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:41:36 +00:00
Devin AI
48604f567b fix: Improve agentops initialization with better validation and logging
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:27:56 +00:00
Devin AI
f724ed93b9 fix: Sort imports to fix linting issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:27:21 +00:00
Devin AI
abc71be5f6 fix: Sort imports to fix linting issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:25:41 +00:00
Devin AI
420f826e56 fix: Initialize agentops in Crew setup (#2102)
- Add proper agentops initialization in Crew's set_private_attrs
- Add test coverage for agentops initialization
- Add test coverage for Gemini LLM with agentops

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 06:24:05 +00:00
3 changed files with 175 additions and 10 deletions

3
pytest.ini Normal file
View File

@@ -0,0 +1,3 @@
[pytest]
markers =
agentops: Tests for AgentOps integration

View File

@@ -1,5 +1,6 @@
import asyncio
import json
import os
import re
import uuid
import warnings
@@ -54,8 +55,11 @@ from crewai.utilities.training_handler import CrewTrainingHandler
try:
import agentops # type: ignore
from agentops.exceptions import AgentOpsError, AuthenticationError # type: ignore
except ImportError:
agentops = None
AgentOpsError = None
AuthenticationError = None
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
@@ -90,6 +94,8 @@ class Crew(BaseModel):
__hash__ = object.__hash__ # type: ignore
_execution_span: Any = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr()
_agentops: Optional['agentops.AgentOps'] = PrivateAttr(default=None)
_telemetry: Optional[Telemetry] = PrivateAttr(default=None)
_logger: Logger = PrivateAttr()
_file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
@@ -240,19 +246,72 @@ class Crew(BaseModel):
# TODO: Improve typing
return json.loads(v) if isinstance(v, Json) else v # type: ignore
@model_validator(mode="after")
def _validate_api_key(self, api_key: Optional[str]) -> bool:
"""Validate the AgentOps API key.
Args:
api_key: The API key to validate
Returns:
bool: True if the API key is valid, False otherwise
"""
if not api_key:
return False
stripped_key = api_key.strip()
return bool(stripped_key and len(stripped_key) > 10)
def set_private_attrs(self) -> "Crew":
"""Set private attributes."""
"""Initialize private attributes including AgentOps integration.
This method sets up:
- Logger and file handler for output logging
- RPM controller for rate limiting
- AgentOps integration for monitoring (if available and configured)
"""
self._cache_handler = CacheHandler()
self._logger = Logger(verbose=self.verbose)
if self.output_log_file:
self._file_handler = FileHandler(self.output_log_file)
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = create_llm(self.function_calling_llm)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
# Initialize agentops if available and API key is present
if agentops:
api_key = os.getenv("AGENTOPS_API_KEY")
if self._validate_api_key(api_key):
try:
agentops.init(api_key)
self._agentops = agentops
self._logger.log(
"info",
"Successfully initialized agentops",
color="green"
)
except (ConnectionError, AuthenticationError) as e:
self._logger.log(
"warning",
f"Failed to connect to agentops: {e}",
color="yellow"
)
self._agentops = None
except (ValueError, AgentOpsError) as e:
self._logger.log(
"warning",
f"Invalid agentops configuration: {e}",
color="yellow"
)
self._agentops = None
else:
self._logger.log(
"warning",
"Invalid AGENTOPS_API_KEY provided",
color="yellow"
)
self._agentops = None
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = create_llm(self.function_calling_llm)
return self
@model_validator(mode="after")
@@ -543,7 +602,8 @@ class Crew(BaseModel):
inputs = before_callback(inputs)
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
if self._telemetry:
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
self._task_output_handler.reset()
self._logging_color = "bold_purple"
@@ -1121,16 +1181,22 @@ class Crew(BaseModel):
for agent in self.agents:
agent.interpolate_inputs(inputs)
def _finish_execution(self, final_string_output: str) -> None:
def _finish_execution(self, final_output: Union[str, CrewOutput]) -> None:
"""Finish execution and cleanup.
Args:
final_output: The final output from crew execution, either as string or CrewOutput
"""
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
if agentops:
agentops.end_session(
if self._telemetry:
self._telemetry.end_crew(self, final_output)
if self._agentops:
self._agentops.end_session(
end_state="Success",
end_state_reason="Finished Execution",
is_auto_end=True,
)
self._telemetry.end_crew(self, final_string_output)
def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics."""

View File

@@ -6,6 +6,7 @@ from concurrent.futures import Future
from unittest import mock
from unittest.mock import MagicMock, patch
import agentops
import instructor
import pydantic_core
import pytest
@@ -15,6 +16,7 @@ from crewai.agents.cache import CacheHandler
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.process import Process
from crewai.project import crew
@@ -27,6 +29,100 @@ from crewai.utilities import Logger
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
@pytest.fixture
def researcher():
"""Fixture to create a researcher agent."""
return Agent(
role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups.",
allow_delegation=False,
)
@pytest.fixture
def mock_agentops():
"""Fixture to mock agentops for testing."""
mock_agentops = MagicMock()
mock_agentops.init = MagicMock()
return mock_agentops
@pytest.mark.agentops
class TestAgentOpsIntegration:
"""Tests for AgentOps integration."""
def test_initialization_with_api_key(self, mock_agentops, monkeypatch):
"""Test that agentops is properly initialized when API key is present."""
monkeypatch.setattr("crewai.crew.agentops", mock_agentops)
monkeypatch.setenv("AGENTOPS_API_KEY", "test-key-12345")
crew = Crew(agents=[researcher], tasks=[Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)])
crew.set_private_attrs()
mock_agentops.init.assert_called_once_with("test-key-12345")
def test_initialization_without_api_key(self, mock_agentops):
"""Test that agentops is not initialized when API key is not present."""
crew = Crew(agents=[researcher], tasks=[Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)])
mock_agentops.assert_not_called()
def test_initialization_with_invalid_api_key(self, mock_agentops, monkeypatch):
"""Test that agentops is not initialized when API key is invalid."""
monkeypatch.setenv("AGENTOPS_API_KEY", " ")
crew = Crew(agents=[researcher], tasks=[Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)])
mock_agentops.assert_not_called()
def test_gemini_llm_integration(self, mock_agentops, monkeypatch):
"""Test that Gemini LLM works correctly with agentops."""
# Mock agentops
monkeypatch.setattr("crewai.crew.agentops", mock_agentops)
# Set API keys
monkeypatch.setenv("AGENTOPS_API_KEY", "test-key-12345")
monkeypatch.setenv("GOOGLE_API_KEY", "test-key")
# Create crew with Gemini LLM
llm = LLM(model="gemini-pro")
agent = Agent(
role="test",
goal="test",
backstory="test",
llm=llm
)
task = Task(
description="test task",
expected_output="test output",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task])
crew.set_private_attrs()
# Mock the agent execution to avoid actual API calls
with patch.object(Task, 'execute_sync', return_value=TaskOutput(
description="test",
raw="test output",
agent=agent.role
)):
# Run crew
crew.kickoff()
# Verify agentops.end_session was called correctly
mock_agentops.end_session.assert_called_once_with(
end_state="Success",
end_state_reason="Finished Execution",
is_auto_end=True
)
ceo = Agent(
role="CEO",
goal="Make sure the writers in your company produce amazing content.",