mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 12:28:30 +00:00
Compare commits
7 Commits
lg-python-
...
devin/1739
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db34516b18 | ||
|
|
6c3f4a6446 | ||
|
|
3cf2f982dd | ||
|
|
48604f567b | ||
|
|
f724ed93b9 | ||
|
|
abc71be5f6 | ||
|
|
420f826e56 |
3
pytest.ini
Normal file
3
pytest.ini
Normal file
@@ -0,0 +1,3 @@
|
||||
[pytest]
|
||||
markers =
|
||||
agentops: Tests for AgentOps integration
|
||||
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
import warnings
|
||||
@@ -54,8 +55,11 @@ from crewai.utilities.training_handler import CrewTrainingHandler
|
||||
|
||||
try:
|
||||
import agentops # type: ignore
|
||||
from agentops.exceptions import AgentOpsError, AuthenticationError # type: ignore
|
||||
except ImportError:
|
||||
agentops = None
|
||||
AgentOpsError = None
|
||||
AuthenticationError = None
|
||||
|
||||
|
||||
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
|
||||
@@ -90,6 +94,8 @@ class Crew(BaseModel):
|
||||
__hash__ = object.__hash__ # type: ignore
|
||||
_execution_span: Any = PrivateAttr()
|
||||
_rpm_controller: RPMController = PrivateAttr()
|
||||
_agentops: Optional['agentops.AgentOps'] = PrivateAttr(default=None)
|
||||
_telemetry: Optional[Telemetry] = PrivateAttr(default=None)
|
||||
_logger: Logger = PrivateAttr()
|
||||
_file_handler: FileHandler = PrivateAttr()
|
||||
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
|
||||
@@ -240,19 +246,72 @@ class Crew(BaseModel):
|
||||
# TODO: Improve typing
|
||||
return json.loads(v) if isinstance(v, Json) else v # type: ignore
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_api_key(self, api_key: Optional[str]) -> bool:
|
||||
"""Validate the AgentOps API key.
|
||||
|
||||
Args:
|
||||
api_key: The API key to validate
|
||||
|
||||
Returns:
|
||||
bool: True if the API key is valid, False otherwise
|
||||
"""
|
||||
if not api_key:
|
||||
return False
|
||||
stripped_key = api_key.strip()
|
||||
return bool(stripped_key and len(stripped_key) > 10)
|
||||
|
||||
def set_private_attrs(self) -> "Crew":
|
||||
"""Set private attributes."""
|
||||
"""Initialize private attributes including AgentOps integration.
|
||||
|
||||
This method sets up:
|
||||
- Logger and file handler for output logging
|
||||
- RPM controller for rate limiting
|
||||
- AgentOps integration for monitoring (if available and configured)
|
||||
"""
|
||||
self._cache_handler = CacheHandler()
|
||||
self._logger = Logger(verbose=self.verbose)
|
||||
if self.output_log_file:
|
||||
self._file_handler = FileHandler(self.output_log_file)
|
||||
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
|
||||
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
|
||||
self.function_calling_llm = create_llm(self.function_calling_llm)
|
||||
|
||||
self._telemetry = Telemetry()
|
||||
self._telemetry.set_tracer()
|
||||
|
||||
# Initialize agentops if available and API key is present
|
||||
if agentops:
|
||||
api_key = os.getenv("AGENTOPS_API_KEY")
|
||||
if self._validate_api_key(api_key):
|
||||
try:
|
||||
agentops.init(api_key)
|
||||
self._agentops = agentops
|
||||
self._logger.log(
|
||||
"info",
|
||||
"Successfully initialized agentops",
|
||||
color="green"
|
||||
)
|
||||
except (ConnectionError, AuthenticationError) as e:
|
||||
self._logger.log(
|
||||
"warning",
|
||||
f"Failed to connect to agentops: {e}",
|
||||
color="yellow"
|
||||
)
|
||||
self._agentops = None
|
||||
except (ValueError, AgentOpsError) as e:
|
||||
self._logger.log(
|
||||
"warning",
|
||||
f"Invalid agentops configuration: {e}",
|
||||
color="yellow"
|
||||
)
|
||||
self._agentops = None
|
||||
else:
|
||||
self._logger.log(
|
||||
"warning",
|
||||
"Invalid AGENTOPS_API_KEY provided",
|
||||
color="yellow"
|
||||
)
|
||||
self._agentops = None
|
||||
|
||||
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
|
||||
self.function_calling_llm = create_llm(self.function_calling_llm)
|
||||
return self
|
||||
|
||||
@model_validator(mode="after")
|
||||
@@ -543,7 +602,8 @@ class Crew(BaseModel):
|
||||
inputs = before_callback(inputs)
|
||||
|
||||
"""Starts the crew to work on its assigned tasks."""
|
||||
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
|
||||
if self._telemetry:
|
||||
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
|
||||
self._task_output_handler.reset()
|
||||
self._logging_color = "bold_purple"
|
||||
|
||||
@@ -1121,16 +1181,22 @@ class Crew(BaseModel):
|
||||
for agent in self.agents:
|
||||
agent.interpolate_inputs(inputs)
|
||||
|
||||
def _finish_execution(self, final_string_output: str) -> None:
|
||||
def _finish_execution(self, final_output: Union[str, CrewOutput]) -> None:
|
||||
"""Finish execution and cleanup.
|
||||
|
||||
Args:
|
||||
final_output: The final output from crew execution, either as string or CrewOutput
|
||||
"""
|
||||
if self.max_rpm:
|
||||
self._rpm_controller.stop_rpm_counter()
|
||||
if agentops:
|
||||
agentops.end_session(
|
||||
if self._telemetry:
|
||||
self._telemetry.end_crew(self, final_output)
|
||||
if self._agentops:
|
||||
self._agentops.end_session(
|
||||
end_state="Success",
|
||||
end_state_reason="Finished Execution",
|
||||
is_auto_end=True,
|
||||
)
|
||||
self._telemetry.end_crew(self, final_string_output)
|
||||
|
||||
def calculate_usage_metrics(self) -> UsageMetrics:
|
||||
"""Calculates and returns the usage metrics."""
|
||||
|
||||
@@ -6,6 +6,7 @@ from concurrent.futures import Future
|
||||
from unittest import mock
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import agentops
|
||||
import instructor
|
||||
import pydantic_core
|
||||
import pytest
|
||||
@@ -15,6 +16,7 @@ from crewai.agents.cache import CacheHandler
|
||||
from crewai.crew import Crew
|
||||
from crewai.crews.crew_output import CrewOutput
|
||||
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
|
||||
from crewai.llm import LLM
|
||||
from crewai.memory.contextual.contextual_memory import ContextualMemory
|
||||
from crewai.process import Process
|
||||
from crewai.project import crew
|
||||
@@ -27,6 +29,100 @@ from crewai.utilities import Logger
|
||||
from crewai.utilities.rpm_controller import RPMController
|
||||
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def researcher():
|
||||
"""Fixture to create a researcher agent."""
|
||||
return Agent(
|
||||
role="Researcher",
|
||||
goal="Make the best research and analysis on content about AI and AI agents",
|
||||
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups.",
|
||||
allow_delegation=False,
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_agentops():
|
||||
"""Fixture to mock agentops for testing."""
|
||||
mock_agentops = MagicMock()
|
||||
mock_agentops.init = MagicMock()
|
||||
return mock_agentops
|
||||
|
||||
@pytest.mark.agentops
|
||||
class TestAgentOpsIntegration:
|
||||
"""Tests for AgentOps integration."""
|
||||
|
||||
def test_initialization_with_api_key(self, mock_agentops, monkeypatch):
|
||||
"""Test that agentops is properly initialized when API key is present."""
|
||||
monkeypatch.setattr("crewai.crew.agentops", mock_agentops)
|
||||
monkeypatch.setenv("AGENTOPS_API_KEY", "test-key-12345")
|
||||
crew = Crew(agents=[researcher], tasks=[Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher,
|
||||
)])
|
||||
crew.set_private_attrs()
|
||||
mock_agentops.init.assert_called_once_with("test-key-12345")
|
||||
|
||||
def test_initialization_without_api_key(self, mock_agentops):
|
||||
"""Test that agentops is not initialized when API key is not present."""
|
||||
crew = Crew(agents=[researcher], tasks=[Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher,
|
||||
)])
|
||||
mock_agentops.assert_not_called()
|
||||
|
||||
def test_initialization_with_invalid_api_key(self, mock_agentops, monkeypatch):
|
||||
"""Test that agentops is not initialized when API key is invalid."""
|
||||
monkeypatch.setenv("AGENTOPS_API_KEY", " ")
|
||||
crew = Crew(agents=[researcher], tasks=[Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher,
|
||||
)])
|
||||
mock_agentops.assert_not_called()
|
||||
|
||||
def test_gemini_llm_integration(self, mock_agentops, monkeypatch):
|
||||
"""Test that Gemini LLM works correctly with agentops."""
|
||||
# Mock agentops
|
||||
monkeypatch.setattr("crewai.crew.agentops", mock_agentops)
|
||||
|
||||
# Set API keys
|
||||
monkeypatch.setenv("AGENTOPS_API_KEY", "test-key-12345")
|
||||
monkeypatch.setenv("GOOGLE_API_KEY", "test-key")
|
||||
|
||||
# Create crew with Gemini LLM
|
||||
llm = LLM(model="gemini-pro")
|
||||
agent = Agent(
|
||||
role="test",
|
||||
goal="test",
|
||||
backstory="test",
|
||||
llm=llm
|
||||
)
|
||||
task = Task(
|
||||
description="test task",
|
||||
expected_output="test output",
|
||||
agent=agent
|
||||
)
|
||||
crew = Crew(agents=[agent], tasks=[task])
|
||||
crew.set_private_attrs()
|
||||
|
||||
# Mock the agent execution to avoid actual API calls
|
||||
with patch.object(Task, 'execute_sync', return_value=TaskOutput(
|
||||
description="test",
|
||||
raw="test output",
|
||||
agent=agent.role
|
||||
)):
|
||||
# Run crew
|
||||
crew.kickoff()
|
||||
|
||||
# Verify agentops.end_session was called correctly
|
||||
mock_agentops.end_session.assert_called_once_with(
|
||||
end_state="Success",
|
||||
end_state_reason="Finished Execution",
|
||||
is_auto_end=True
|
||||
)
|
||||
|
||||
ceo = Agent(
|
||||
role="CEO",
|
||||
goal="Make sure the writers in your company produce amazing content.",
|
||||
|
||||
Reference in New Issue
Block a user