Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
90c577fdd0 refactor: improve type safety and test patterns
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-09 21:28:13 +00:00
Devin AI
b8a15c6115 fix: enable any llm to run test functionality
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-09 21:15:54 +00:00
7 changed files with 153 additions and 274 deletions

View File

@@ -4,6 +4,7 @@ import uuid
import warnings
from concurrent.futures import Future
from hashlib import md5
from crewai.llm import LLM
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from pydantic import (
@@ -1075,19 +1076,36 @@ class Crew(BaseModel):
def test(
self,
n_iterations: int,
openai_model_name: Optional[str] = None,
llm: Union[str, LLM],
inputs: Optional[Dict[str, Any]] = None,
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures.
Args:
n_iterations: Number of test iterations to run
llm: Language model to use for evaluation. Can be either a model name string (e.g. "gpt-4")
or an LLM instance for custom implementations
inputs: Optional dictionary of input values to use for task execution
Example:
```python
# Using model name string
crew.test(n_iterations=3, llm="gpt-4")
# Using custom LLM implementation
custom_llm = LLM(model="custom-model")
crew.test(n_iterations=3, llm=custom_llm)
```
"""
test_crew = self.copy()
self._test_execution_span = test_crew._telemetry.test_execution_span(
test_crew,
n_iterations,
inputs,
openai_model_name, # type: ignore[arg-type]
) # type: ignore[arg-type]
evaluator = CrewEvaluator(test_crew, openai_model_name) # type: ignore[arg-type]
str(llm) if isinstance(llm, LLM) else llm,
)
evaluator = CrewEvaluator(test_crew, llm)
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)

View File

@@ -1,10 +1,16 @@
from collections import defaultdict
from typing import Any, Dict, List, Optional, TypeVar, Union
from typing import DefaultDict # Separate import to avoid circular imports
from pydantic import BaseModel, Field
from rich.box import HEAVY_EDGE
from rich.console import Console
from rich.table import Table
from crewai.llm import LLM
T = TypeVar('T', bound=LLM)
from crewai.agent import Agent
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
@@ -28,14 +34,47 @@ class CrewEvaluator:
iteration (int): The current iteration of the evaluation.
"""
tasks_scores: defaultdict = defaultdict(list)
run_execution_times: defaultdict = defaultdict(list)
_tasks_scores: DefaultDict[int, List[float]] = Field(
default_factory=lambda: defaultdict(list))
_run_execution_times: DefaultDict[int, List[float]] = Field(
default_factory=lambda: defaultdict(list))
iteration: int = 0
def __init__(self, crew, openai_model_name: str):
@property
def tasks_scores(self) -> DefaultDict[int, List[float]]:
return self._tasks_scores
@tasks_scores.setter
def tasks_scores(self, value: Dict[int, List[float]]) -> None:
self._tasks_scores = defaultdict(list, value)
@property
def run_execution_times(self) -> DefaultDict[int, List[float]]:
return self._run_execution_times
@run_execution_times.setter
def run_execution_times(self, value: Dict[int, List[float]]) -> None:
self._run_execution_times = defaultdict(list, value)
def __init__(self, crew, llm: Union[str, T]):
"""Initialize the CrewEvaluator.
Args:
crew: The Crew instance to evaluate
llm: Language model to use for evaluation. Can be either a model name string
or an LLM instance for custom implementations
Raises:
ValueError: If llm is None or invalid
"""
if not llm:
raise ValueError("Invalid LLM configuration")
self.crew = crew
self.openai_model_name = openai_model_name
self.llm = LLM(model=llm) if isinstance(llm, str) else llm
self._telemetry = Telemetry()
self._tasks_scores = defaultdict(list)
self._run_execution_times = defaultdict(list)
self._setup_for_evaluating()
def _setup_for_evaluating(self) -> None:
@@ -51,7 +90,7 @@ class CrewEvaluator:
),
backstory="Evaluator agent for crew evaluation with precise capabilities to evaluate the performance of the agents in the crew based on the tasks they have performed",
verbose=False,
llm=self.openai_model_name,
llm=self.llm,
)
def _evaluation_task(
@@ -181,11 +220,19 @@ class CrewEvaluator:
self.crew,
evaluation_result.pydantic.quality,
current_task._execution_time,
self.openai_model_name,
self._get_llm_identifier(),
)
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
self.run_execution_times[self.iteration].append(
self._tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
self._run_execution_times[self.iteration].append(
current_task._execution_time
)
else:
raise ValueError("Evaluation result is not in the expected format")
def _get_llm_identifier(self) -> str:
"""Get a string identifier for the LLM instance.
Returns:
String representation of the LLM for telemetry
"""
return str(self.llm) if isinstance(self.llm, LLM) else self.llm

View File

@@ -1,95 +1,42 @@
from typing import Optional
import sys
from enum import Enum
class Color(Enum):
"""Enum for text colors in terminal output."""
PURPLE = "\033[95m"
RED = "\033[91m"
GREEN = "\033[92m"
BLUE = "\033[94m"
YELLOW = "\033[93m"
BOLD = "\033[1m"
RESET = "\033[00m"
class Printer:
"""
Utility class for printing formatted text to stdout.
Uses direct stdout writing for compatibility with asynchronous environments.
"""
def print(self, content: str, color: Optional[str] = None) -> None:
"""
Print content with optional color formatting.
Args:
content: The text to print
color: Optional color name (e.g., "purple", "bold_green")
"""
output = content
def print(self, content: str, color: Optional[str] = None):
if color == "purple":
output = self._format_purple(content)
self._print_purple(content)
elif color == "red":
output = self._format_red(content)
self._print_red(content)
elif color == "bold_green":
output = self._format_bold_green(content)
self._print_bold_green(content)
elif color == "bold_purple":
output = self._format_bold_purple(content)
self._print_bold_purple(content)
elif color == "bold_blue":
output = self._format_bold_blue(content)
self._print_bold_blue(content)
elif color == "yellow":
output = self._format_yellow(content)
self._print_yellow(content)
elif color == "bold_yellow":
output = self._format_bold_yellow(content)
try:
sys.stdout.write(f"{output}\n")
sys.stdout.flush()
except IOError:
pass
self._print_bold_yellow(content)
else:
print(content)
def _format_text(self, content: str, color: Color, bold: bool = False) -> str:
"""
Format text with color and optional bold styling.
Args:
content: The text to format
color: The color to apply
bold: Whether to apply bold formatting
Returns:
Formatted text string
"""
if bold:
return f"{Color.BOLD.value}{color.value} {content}{Color.RESET.value}"
return f"{color.value} {content}{Color.RESET.value}"
def _print_bold_purple(self, content):
print("\033[1m\033[95m {}\033[00m".format(content))
def _format_bold_purple(self, content: str) -> str:
"""Format text as bold purple."""
return self._format_text(content, Color.PURPLE, bold=True)
def _print_bold_green(self, content):
print("\033[1m\033[92m {}\033[00m".format(content))
def _format_bold_green(self, content: str) -> str:
"""Format text as bold green."""
return self._format_text(content, Color.GREEN, bold=True)
def _print_purple(self, content):
print("\033[95m {}\033[00m".format(content))
def _format_purple(self, content: str) -> str:
"""Format text as purple."""
return self._format_text(content, Color.PURPLE)
def _print_red(self, content):
print("\033[91m {}\033[00m".format(content))
def _format_red(self, content: str) -> str:
"""Format text as red."""
return self._format_text(content, Color.RED)
def _print_bold_blue(self, content):
print("\033[1m\033[94m {}\033[00m".format(content))
def _format_bold_blue(self, content: str) -> str:
"""Format text as bold blue."""
return self._format_text(content, Color.BLUE, bold=True)
def _print_yellow(self, content):
print("\033[93m {}\033[00m".format(content))
def _format_yellow(self, content: str) -> str:
"""Format text as yellow."""
return self._format_text(content, Color.YELLOW)
def _format_bold_yellow(self, content: str) -> str:
"""Format text as bold yellow."""
return self._format_text(content, Color.YELLOW, bold=True)
def _print_bold_yellow(self, content):
print("\033[1m\033[93m {}\033[00m".format(content))

View File

@@ -10,6 +10,7 @@ import instructor
import pydantic_core
import pytest
from crewai.llm import LLM
from crewai.agent import Agent
from crewai.agents.cache import CacheHandler
from crewai.crew import Crew
@@ -1123,7 +1124,7 @@ def test_kickoff_for_each_empty_input():
assert results == []
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.mark.vcr(filter_headeruvs=["authorization"])
def test_kickoff_for_each_invalid_input():
"""Tests if kickoff_for_each raises TypeError for invalid input types."""
@@ -2828,7 +2829,7 @@ def test_crew_testing_function(kickoff_mock, copy_mock, crew_evaluator):
copy_mock.return_value = crew
n_iterations = 2
crew.test(n_iterations, openai_model_name="gpt-4o-mini", inputs={"topic": "AI"})
crew.test(n_iterations, llm="gpt-4o-mini", inputs={"topic": "AI"})
# Ensure kickoff is called on the copied crew
kickoff_mock.assert_has_calls(
@@ -2844,6 +2845,32 @@ def test_crew_testing_function(kickoff_mock, copy_mock, crew_evaluator):
]
)
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch("crewai.crew.Crew.copy")
@mock.patch("crewai.crew.Crew.kickoff")
def test_crew_testing_with_custom_llm(kickoff_mock, copy_mock, crew_evaluator):
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
copy_mock.return_value = crew
custom_llm = LLM(model="gpt-4")
crew.test(2, llm=custom_llm, inputs={"topic": "AI"})
kickoff_mock.assert_has_calls([
mock.call(inputs={"topic": "AI"}),
mock.call(inputs={"topic": "AI"})
])
crew_evaluator.assert_has_calls([
mock.call(crew, custom_llm),
mock.call().set_iteration(1),
mock.call().set_iteration(2),
mock.call().print_crew_evaluation_result(),
])
@pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_verbose_manager_agent():
@@ -3125,4 +3152,4 @@ def test_multimodal_agent_live_image_analysis():
# Verify we got a meaningful response
assert isinstance(result.raw, str)
assert len(result.raw) > 100 # Expecting a detailed analysis
assert "error" not in result.raw.lower() # No error messages in response
assert "error" not in result.raw.lower() # No error messages in response

View File

@@ -2,6 +2,7 @@ from unittest import mock
import pytest
from crewai.llm import LLM
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
@@ -23,7 +24,7 @@ class TestCrewEvaluator:
)
crew = Crew(agents=[agent], tasks=[task])
return CrewEvaluator(crew, openai_model_name="gpt-4o-mini")
return CrewEvaluator(crew, llm="gpt-4o-mini")
def test_setup_for_evaluating(self, crew_planner):
crew_planner._setup_for_evaluating()
@@ -47,6 +48,25 @@ class TestCrewEvaluator:
assert agent.verbose is False
assert agent.llm.model == "gpt-4o-mini"
@pytest.mark.parametrize("llm_input,expected_model", [
(LLM(model="gpt-4"), "gpt-4"),
("gpt-4", "gpt-4"),
])
def test_evaluator_with_llm_types(self, crew_planner, llm_input, expected_model):
evaluator = CrewEvaluator(crew_planner.crew, llm_input)
agent = evaluator._evaluator_agent()
assert agent.llm.model == expected_model
def test_evaluator_with_invalid_llm(self, crew_planner):
with pytest.raises(ValueError, match="Invalid LLM configuration"):
CrewEvaluator(crew_planner.crew, None)
def test_evaluator_with_string_llm(self, crew_planner):
evaluator = CrewEvaluator(crew_planner.crew, "gpt-4")
agent = evaluator._evaluator_agent()
assert isinstance(agent.llm, LLM)
assert agent.llm.model == "gpt-4"
def test_evaluation_task(self, crew_planner):
evaluator_agent = Agent(
role="Evaluator Agent",

View File

@@ -1,92 +0,0 @@
import sys
import unittest
from unittest.mock import patch
import asyncio
import pytest
from io import StringIO
try:
import fastapi
from fastapi import FastAPI
from fastapi.testclient import TestClient
try:
from httpx import AsyncClient
ASYNC_CLIENT_AVAILABLE = True
except ImportError:
ASYNC_CLIENT_AVAILABLE = False
FASTAPI_AVAILABLE = True
except ImportError:
FASTAPI_AVAILABLE = False
ASYNC_CLIENT_AVAILABLE = False
from crewai.utilities.logger import Logger
@unittest.skipIf(not FASTAPI_AVAILABLE, "FastAPI not installed")
class TestFastAPILogger(unittest.TestCase):
"""Test suite for Logger class in FastAPI context."""
def setUp(self):
"""Set up test environment before each test."""
if not FASTAPI_AVAILABLE:
self.skipTest("FastAPI not installed")
self.app = FastAPI()
self.logger = Logger(verbose=True)
@self.app.get("/")
async def root():
self.logger.log("info", "This is a test log message from FastAPI")
return {"message": "Hello World"}
@self.app.get("/error")
async def error_route():
self.logger.log("error", "This is an error log message from FastAPI")
return {"error": "Test error"}
self.client = TestClient(self.app)
self.output = StringIO()
self.old_stdout = sys.stdout
sys.stdout = self.output
def tearDown(self):
"""Clean up test environment after each test."""
sys.stdout = self.old_stdout
def test_logger_in_fastapi_context(self):
"""Test that logger works in FastAPI context."""
response = self.client.get("/")
output = self.output.getvalue()
self.assertIn("[INFO]: This is a test log message from FastAPI", output)
self.assertIn("\n", output)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), {"message": "Hello World"})
@pytest.mark.parametrize("route,log_level,expected_message", [
("/", "info", "This is a test log message from FastAPI"),
("/error", "error", "This is an error log message from FastAPI")
])
def test_multiple_routes(self, route, log_level, expected_message):
"""Test logging from different routes with different log levels."""
response = self.client.get(route)
output = self.output.getvalue()
self.assertIn(f"[{log_level.upper()}]: {expected_message}", output)
self.assertEqual(response.status_code, 200)
@unittest.skipIf(not ASYNC_CLIENT_AVAILABLE, "AsyncClient not available")
@pytest.mark.asyncio
async def test_async_logger_in_fastapi(self):
"""Test logger in async context using AsyncClient."""
self.output = StringIO()
sys.stdout = self.output
async with AsyncClient(app=self.app, base_url="http://test") as ac:
response = await ac.get("/")
self.assertEqual(response.status_code, 200)
output = self.output.getvalue()
self.assertIn("[INFO]: This is a test log message from FastAPI", output)

View File

@@ -1,88 +0,0 @@
import sys
import unittest
import threading
from unittest.mock import patch
from io import StringIO
import pytest
from crewai.utilities.logger import Logger
class TestLogger(unittest.TestCase):
"""Test suite for the Logger class."""
def setUp(self):
"""Set up test environment before each test."""
self.logger = Logger(verbose=True)
self.output = StringIO()
self.old_stdout = sys.stdout
sys.stdout = self.output
def tearDown(self):
"""Clean up test environment after each test."""
sys.stdout = self.old_stdout
def test_log_in_sync_context(self):
"""Test logging in a regular synchronous context."""
self.logger.log("info", "Test message")
output = self.output.getvalue()
self.assertIn("[INFO]: Test message", output)
self.assertIn("\n", output)
@patch('sys.stdout.flush')
def test_stdout_is_flushed(self, mock_flush):
"""Test that stdout is properly flushed after writing."""
self.logger.log("info", "Test message")
mock_flush.assert_called_once()
@pytest.mark.parametrize("log_level,message", [
("info", "Info message"),
("error", "Error message"),
("warning", "Warning message"),
("debug", "Debug message")
])
def test_multiple_log_levels(self, log_level, message):
"""Test logging with different log levels."""
self.logger.log(log_level, message)
output = self.output.getvalue()
self.assertIn(f"[{log_level.upper()}]: {message}", output)
def test_thread_safety(self):
"""Test that logger is thread-safe."""
messages = []
for i in range(10):
messages.append(f"Message {i}")
threads = []
for message in messages:
thread = threading.Thread(
target=lambda msg: self.logger.log("info", msg),
args=(message,)
)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
output = self.output.getvalue()
for message in messages:
self.assertIn(message, output)
class TestFastAPICompatibility(unittest.TestCase):
"""Test compatibility with FastAPI."""
def test_import_in_fastapi(self):
"""Test that logger can be imported in a FastAPI context."""
try:
import fastapi
from crewai.utilities.logger import Logger
logger = Logger(verbose=True)
self.assertTrue(True)
except ImportError:
self.skipTest("FastAPI not installed")
except Exception as e:
self.fail(f"Unexpected error: {e}")