style: fix mypy issues

This commit is contained in:
Lucas Gomide
2025-07-11 00:42:03 -03:00
parent 43f339fa84
commit 6f0ed6642b
10 changed files with 80 additions and 65 deletions

View File

@@ -3,9 +3,10 @@ from crewai.agent import Agent
from crewai.task import Task from crewai.task import Task
from crewai.evaluation.evaluation_display import EvaluationDisplayFormatter from crewai.evaluation.evaluation_display import EvaluationDisplayFormatter
from typing import List, Optional, Dict, Any from typing import Any, Dict
from collections import defaultdict from collections import defaultdict
from crewai.evaluation import BaseEvaluator, create_evaluation_callbacks from crewai.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.crew import Crew from crewai.crew import Crew
from crewai.utilities.events.crewai_event_bus import crewai_event_bus from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
@@ -13,28 +14,29 @@ from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
class AgentEvaluator: class AgentEvaluator:
def __init__( def __init__(
self, self,
evaluators: Optional[List[BaseEvaluator]] = None, evaluators: Sequence[BaseEvaluator] | None = None,
crew: Optional[Any] = None, crew: Crew | None = None,
): ):
self.crew: Crew = crew self.crew: Crew | None = crew
self.evaluators = evaluators self.evaluators: Sequence[BaseEvaluator] | None = evaluators
self.agent_evaluators = {} self.agent_evaluators: dict[str, Sequence[BaseEvaluator] | None] = {}
if crew is not None: if crew is not None:
assert crew and crew.agents is not None
for agent in crew.agents: for agent in crew.agents:
self.agent_evaluators[agent.id] = self.evaluators.copy() self.agent_evaluators[str(agent.id)] = self.evaluators
self.callback = create_evaluation_callbacks() self.callback = create_evaluation_callbacks()
self.console_formatter = ConsoleFormatter() self.console_formatter = ConsoleFormatter()
self.display_formatter = EvaluationDisplayFormatter() self.display_formatter = EvaluationDisplayFormatter()
self.iteration = 1 self.iteration = 1
self.iterations_results = {} self.iterations_results: dict[int, dict[str, list[AgentEvaluationResult]]] = {}
def set_iteration(self, iteration: int) -> None: def set_iteration(self, iteration: int) -> None:
self.iteration = iteration self.iteration = iteration
def evaluate_current_iteration(self): def evaluate_current_iteration(self) -> dict[str, list[AgentEvaluationResult]]:
if not self.crew: if not self.crew:
raise ValueError("Cannot evaluate: no crew was provided to the evaluator.") raise ValueError("Cannot evaluate: no crew was provided to the evaluator.")
@@ -42,12 +44,12 @@ class AgentEvaluator:
raise ValueError("Cannot evaluate: no callback was set. Use set_callback() method first.") raise ValueError("Cannot evaluate: no callback was set. Use set_callback() method first.")
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
evaluation_results = defaultdict(list) evaluation_results: defaultdict[str, list[AgentEvaluationResult]] = defaultdict(list)
total_evals = 0 total_evals = 0
for agent in self.crew.agents: for agent in self.crew.agents:
for task in self.crew.tasks: for task in self.crew.tasks:
if task.agent.id == agent.id and self.agent_evaluators.get(agent.id): if task.agent and task.agent.id == agent.id and self.agent_evaluators.get(str(agent.id)):
total_evals += 1 total_evals += 1
with Progress( with Progress(
@@ -60,15 +62,16 @@ class AgentEvaluator:
eval_task = progress.add_task(f"Evaluating agents (iteration {self.iteration})...", total=total_evals) eval_task = progress.add_task(f"Evaluating agents (iteration {self.iteration})...", total=total_evals)
for agent in self.crew.agents: for agent in self.crew.agents:
evaluator = self.agent_evaluators.get(agent.id) evaluator = self.agent_evaluators.get(str(agent.id))
if not evaluator: if not evaluator:
continue continue
for task in self.crew.tasks: for task in self.crew.tasks:
if task.agent.id != agent.id:
if task.agent and str(task.agent.id) != str(agent.id):
continue continue
trace = self.callback.get_trace(agent.id, task.id) trace = self.callback.get_trace(str(agent.id), str(task.id))
if not trace: if not trace:
self.console_formatter.print(f"[yellow]Warning: No trace found for agent {agent.role} on task {task.description[:30]}...[/yellow]") self.console_formatter.print(f"[yellow]Warning: No trace found for agent {agent.role} on task {task.description[:30]}...[/yellow]")
progress.update(eval_task, advance=1) progress.update(eval_task, advance=1)
@@ -138,7 +141,7 @@ class AgentEvaluator:
agent_id=str(agent.id), agent_id=str(agent.id),
task_id=str(task.id) task_id=str(task.id)
) )
assert self.evaluators is not None
for evaluator in self.evaluators: for evaluator in self.evaluators:
try: try:
score = evaluator.evaluate( score = evaluator.evaluate(

View File

@@ -23,7 +23,7 @@ class MetricCategory(enum.Enum):
class EvaluationScore(BaseModel): class EvaluationScore(BaseModel):
score: Optional[float] = Field( score: float | None = Field(
default=5.0, default=5.0,
description="Numeric score from 0-10 where 0 is worst and 10 is best, None if not applicable", description="Numeric score from 0-10 where 0 is worst and 10 is best, None if not applicable",
ge=0.0, ge=0.0,
@@ -33,7 +33,7 @@ class EvaluationScore(BaseModel):
default="", default="",
description="Detailed feedback explaining the evaluation score" description="Detailed feedback explaining the evaluation score"
) )
raw_response: Optional[str] = Field( raw_response: str | None = Field(
default=None, default=None,
description="Raw response from the evaluator (e.g., LLM)" description="Raw response from the evaluator (e.g., LLM)"
) )
@@ -45,8 +45,8 @@ class EvaluationScore(BaseModel):
class BaseEvaluator(abc.ABC): class BaseEvaluator(abc.ABC):
def __init__(self, llm: Optional[BaseLLM] = None): def __init__(self, llm: BaseLLM | None = None):
self.llm = create_llm(llm) self.llm: BaseLLM | None = create_llm(llm)
@property @property
@abc.abstractmethod @abc.abstractmethod

View File

@@ -1,7 +1,9 @@
from collections import defaultdict
from typing import Dict, Any, List from typing import Dict, Any, List
from rich.table import Table from rich.table import Table
from rich.box import HEAVY_EDGE, ROUNDED from rich.box import HEAVY_EDGE, ROUNDED
from crewai.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy from collections.abc import Sequence
from crewai.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy, AgentEvaluationResult, MetricCategory
from crewai.evaluation import EvaluationScore from crewai.evaluation import EvaluationScore
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.llm_utils import create_llm from crewai.utilities.llm_utils import create_llm
@@ -16,7 +18,7 @@ class EvaluationDisplayFormatter:
return return
# Get all agent roles across all iterations # Get all agent roles across all iterations
all_agent_roles = set() all_agent_roles: set[str] = set()
for iter_results in iterations_results.values(): for iter_results in iterations_results.values():
all_agent_roles.update(iter_results.keys()) all_agent_roles.update(iter_results.keys())
@@ -50,9 +52,9 @@ class EvaluationDisplayFormatter:
# Add metrics to table # Add metrics to table
if aggregated_result.metrics: if aggregated_result.metrics:
for metric, evaluation_score in aggregated_result.metrics.items(): for metric, evaluation_score in aggregated_result.metrics.items():
score = evaluation_score.score if evaluation_score.score is not None else "N/A" score = evaluation_score.score
if isinstance(score, (int, float)) and score is not None: if isinstance(score, (int, float)):
if score >= 8.0: if score >= 8.0:
score_text = f"[green]{score:.1f}[/green]" score_text = f"[green]{score:.1f}[/green]"
elif score >= 6.0: elif score >= 6.0:
@@ -109,7 +111,7 @@ class EvaluationDisplayFormatter:
table.add_column("Avg. Total", justify="center") table.add_column("Avg. Total", justify="center")
all_agent_roles = set() all_agent_roles: set[str] = set()
for results in iterations_results.values(): for results in iterations_results.values():
all_agent_roles.update(results.keys()) all_agent_roles.update(results.keys())
@@ -173,7 +175,7 @@ class EvaluationDisplayFormatter:
table.add_row(*row) table.add_row(*row)
all_metrics = set() all_metrics: set[Any] = set()
for metrics in agent_metrics_by_iteration.values(): for metrics in agent_metrics_by_iteration.values():
all_metrics.update(metrics.keys()) all_metrics.update(metrics.keys())
@@ -185,18 +187,18 @@ class EvaluationDisplayFormatter:
for iter_num in sorted(iterations_results.keys()): for iter_num in sorted(iterations_results.keys()):
if (iter_num in agent_metrics_by_iteration and if (iter_num in agent_metrics_by_iteration and
metric in agent_metrics_by_iteration[iter_num]): metric in agent_metrics_by_iteration[iter_num]):
score = agent_metrics_by_iteration[iter_num][metric].score metric_score = agent_metrics_by_iteration[iter_num][metric].score
if score is not None: if metric_score is not None:
metric_scores.append(score) metric_scores.append(metric_score)
if score >= 8.0: if metric_score >= 8.0:
color = "green" color = "green"
elif score >= 6.0: elif metric_score >= 6.0:
color = "cyan" color = "cyan"
elif score >= 4.0: elif metric_score >= 4.0:
color = "yellow" color = "yellow"
else: else:
color = "red" color = "red"
row.append(f"[{color}]{score:.1f}[/]") row.append(f"[{color}]{metric_score:.1f}[/]")
else: else:
row.append("[dim]N/A[/dim]") row.append("[dim]N/A[/dim]")
else: else:
@@ -227,34 +229,29 @@ class EvaluationDisplayFormatter:
self, self,
agent_id: str, agent_id: str,
agent_role: str, agent_role: str,
results: List[Any], results: Sequence[AgentEvaluationResult],
strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE,
) -> AgentAggregatedEvaluationResult: ) -> AgentAggregatedEvaluationResult:
metrics_by_category = {} metrics_by_category: dict[MetricCategory, list[EvaluationScore]] = defaultdict(list)
for result in results: for result in results:
for metric_name, evaluation_score in result.metrics.items(): for metric_name, evaluation_score in result.metrics.items():
if metric_name not in metrics_by_category:
metrics_by_category[metric_name] = []
metrics_by_category[metric_name].append(evaluation_score) metrics_by_category[metric_name].append(evaluation_score)
aggregated_metrics = {} aggregated_metrics: dict[MetricCategory, EvaluationScore] = {}
for category, scores in metrics_by_category.items(): for category, scores in metrics_by_category.items():
valid_scores = [s for s in scores if s.score is not None] valid_scores = [s.score for s in scores if s.score is not None]
avg_score = sum(valid_scores) / len(valid_scores) if valid_scores else None
avg_score = sum(s.score for s in valid_scores) / len(valid_scores) if valid_scores else None
# Extract all feedback text from scores
feedbacks = [s.feedback for s in scores if s.feedback] feedbacks = [s.feedback for s in scores if s.feedback]
# Process feedback based on number of entries
feedback_summary = None feedback_summary = None
if feedbacks: if feedbacks:
if len(feedbacks) > 1: if len(feedbacks) > 1:
# Use the summarization method for multiple feedbacks # Use the summarization method for multiple feedbacks
feedback_summary = self._summarize_feedbacks( feedback_summary = self._summarize_feedbacks(
agent_role=agent_role, agent_role=agent_role,
metric=category, metric=category.title(),
feedbacks=feedbacks, feedbacks=feedbacks,
scores=[s.score for s in scores], scores=[s.score for s in scores],
strategy=strategy strategy=strategy
@@ -269,9 +266,9 @@ class EvaluationDisplayFormatter:
overall_score = None overall_score = None
if aggregated_metrics: if aggregated_metrics:
scores = [m.score for m in aggregated_metrics.values() if m.score is not None] valid_scores = [m.score for m in aggregated_metrics.values() if m.score is not None]
if scores: if valid_scores:
overall_score = sum(scores) / len(scores) overall_score = sum(valid_scores) / len(valid_scores)
return AgentAggregatedEvaluationResult( return AgentAggregatedEvaluationResult(
agent_id=agent_id, agent_id=agent_id,
@@ -287,7 +284,7 @@ class EvaluationDisplayFormatter:
agent_role: str, agent_role: str,
metric: str, metric: str,
feedbacks: List[str], feedbacks: List[str],
scores: List[float], scores: List[float | None],
strategy: AggregationStrategy strategy: AggregationStrategy
) -> str: ) -> str:
if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks): if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks):
@@ -335,7 +332,7 @@ class EvaluationDisplayFormatter:
{all_feedbacks} {all_feedbacks}
"""} """}
] ]
assert llm is not None
response = llm.call(prompt) response = llm.call(prompt)
return response return response

View File

@@ -1,6 +1,8 @@
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
from collections.abc import Sequence
from crewai.agent import Agent from crewai.agent import Agent
from crewai.task import Task from crewai.task import Task
from crewai.utilities.events.base_event_listener import BaseEventListener from crewai.utilities.events.base_event_listener import BaseEventListener
@@ -40,7 +42,7 @@ class EvaluationTraceCallback(BaseEventListener):
def __init__(self): def __init__(self):
if not hasattr(self, "_initialized") or not self._initialized: if not hasattr(self, "_initialized") or not self._initialized:
super().__init__() super().__init__()
self.traces: Dict[str, Dict[str, Any]] = {} self.traces = {}
self.current_agent_id = None self.current_agent_id = None
self.current_task_id = None self.current_task_id = None
self._initialized = True self._initialized = True
@@ -109,8 +111,8 @@ class EvaluationTraceCallback(BaseEventListener):
self.current_agent_id = None self.current_agent_id = None
self.current_task_id = None self.current_task_id = None
def on_tool_use(self, tool_name: str, tool_args: Dict[str, Any], result: Any, def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any,
success: bool = True, error_type: Optional[str] = None): success: bool = True, error_type: str | None = None):
if not self.current_agent_id or not self.current_task_id: if not self.current_agent_id or not self.current_task_id:
return return
@@ -131,7 +133,7 @@ class EvaluationTraceCallback(BaseEventListener):
self.traces[trace_key]["tool_uses"].append(tool_use) self.traces[trace_key]["tool_uses"].append(tool_use)
def on_llm_call_start(self, messages: Union[str, List[Dict[str, Any]]], tools: Optional[List[Dict]] = None): def on_llm_call_start(self, messages: str | Sequence[dict[str, Any]] | None, tools: Sequence[dict[str, Any]] | None = None):
if not self.current_agent_id or not self.current_task_id: if not self.current_agent_id or not self.current_task_id:
return return
@@ -177,7 +179,7 @@ class EvaluationTraceCallback(BaseEventListener):
self.traces[trace_key]["llm_calls"].append(llm_call) self.traces[trace_key]["llm_calls"].append(llm_call)
if hasattr(self, "current_llm_call"): if hasattr(self, "current_llm_call"):
self.current_llm_call = None self.current_llm_call = {}
def get_trace(self, agent_id: str, task_id: str) -> Optional[Dict[str, Any]]: def get_trace(self, agent_id: str, task_id: str) -> Optional[Dict[str, Any]]:
trace_key = f"{agent_id}_{task_id}" trace_key = f"{agent_id}_{task_id}"

View File

@@ -2,10 +2,10 @@
import json import json
import re import re
from typing import Dict, Any from typing import Any
def extract_json_from_llm_response(text: str) -> Dict[str, Any]: def extract_json_from_llm_response(text: str) -> dict[str, Any]:
try: try:
return json.loads(text) return json.loads(text)
except json.JSONDecodeError: except json.JSONDecodeError:
@@ -27,4 +27,4 @@ def extract_json_from_llm_response(text: str) -> Dict[str, Any]:
return json.loads(match.strip()) return json.loads(match.strip())
except json.JSONDecodeError: except json.JSONDecodeError:
continue continue
return text raise ValueError("No valid JSON found in the response")

View File

@@ -46,13 +46,15 @@ Agent's final output:
Evaluate how well the agent's output aligns with the assigned task goal. Evaluate how well the agent's output aligns with the assigned task goal.
"""} """}
] ]
assert self.llm is not None
response = self.llm.call(prompt) response = self.llm.call(prompt)
try: try:
evaluation_data = extract_json_from_llm_response(response) evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
return EvaluationScore( return EvaluationScore(
score=float(evaluation_data.get("score", None)), score=evaluation_data.get("score", 0),
feedback=evaluation_data.get("feedback", response), feedback=evaluation_data.get("feedback", response),
raw_response=response raw_response=response
) )

View File

@@ -11,6 +11,7 @@ import re
from enum import Enum from enum import Enum
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
import numpy as np import numpy as np
from collections.abc import Sequence
from crewai.agent import Agent from crewai.agent import Agent
from crewai.task import Task from crewai.task import Task
@@ -136,6 +137,7 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
"""} """}
] ]
assert self.llm is not None
response = self.llm.call(prompt) response = self.llm.call(prompt)
try: try:
@@ -275,7 +277,7 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
} }
} }
def _calculate_trend(self, values: List[float]) -> float: def _calculate_trend(self, values: Sequence[float | int]) -> float:
if not values or len(values) < 2: if not values or len(values) < 2:
return 0.0 return 0.0
@@ -295,7 +297,7 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
except Exception: except Exception:
return 0.0 return 0.0
def _calculate_loop_likelihood(self, call_lengths: List[float], response_times: List[float]) -> float: def _calculate_loop_likelihood(self, call_lengths: Sequence[float], response_times: Sequence[float]) -> float:
if not call_lengths or len(call_lengths) < 3: if not call_lengths or len(call_lengths) < 3:
return 0.0 return 0.0

View File

@@ -46,12 +46,14 @@ Evaluate the semantic quality and reasoning of this output.
"""} """}
] ]
assert self.llm is not None
response = self.llm.call(prompt) response = self.llm.call(prompt)
try: try:
evaluation_data = extract_json_from_llm_response(response) evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
return EvaluationScore( return EvaluationScore(
score=float(evaluation_data.get("score", None)), score=float(evaluation_data["score"]) if evaluation_data.get("score") is not None else None,
feedback=evaluation_data.get("feedback", response), feedback=evaluation_data.get("feedback", response),
raw_response=response raw_response=response
) )

View File

@@ -87,11 +87,13 @@ IMPORTANT:
- DO NOT evaluate tool usage or results - DO NOT evaluate tool usage or results
"""} """}
] ]
assert self.llm is not None
response = self.llm.call(prompt) response = self.llm.call(prompt)
try: try:
evaluation_data = extract_json_from_llm_response(response) evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {}) scores = evaluation_data.get("scores", {})
relevance = scores.get("relevance", 5.0) relevance = scores.get("relevance", 5.0)
coverage = scores.get("coverage", 5.0) coverage = scores.get("coverage", 5.0)
@@ -220,10 +222,13 @@ Evaluate the quality of the agent's parameter extraction for this task.
"""} """}
] ]
assert self.llm is not None
response = self.llm.call(prompt) response = self.llm.call(prompt)
try: try:
evaluation_data = extract_json_from_llm_response(response) evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {}) scores = evaluation_data.get("scores", {})
accuracy = scores.get("accuracy", 5.0) accuracy = scores.get("accuracy", 5.0)
formatting = scores.get("formatting", 5.0) formatting = scores.get("formatting", 5.0)
@@ -359,10 +364,12 @@ Evaluate the quality of the agent's tool invocation structure during this task.
"""} """}
] ]
assert self.llm is not None
response = self.llm.call(prompt) response = self.llm.call(prompt)
try: try:
evaluation_data = extract_json_from_llm_response(response) evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {}) scores = evaluation_data.get("scores", {})
structure = scores.get("structure", 5.0) structure = scores.get("structure", 5.0)
error_handling = scores.get("error_handling", 5.0) error_handling = scores.get("error_handling", 5.0)

View File

@@ -49,7 +49,7 @@ class LLMCallStartedEvent(LLMEventBase):
type: str = "llm_call_started" type: str = "llm_call_started"
messages: Optional[Union[str, List[Dict[str, Any]]]] = None messages: Optional[Union[str, List[Dict[str, Any]]]] = None
tools: Optional[List[dict]] = None tools: Optional[List[dict[str, Any]]] = None
callbacks: Optional[List[Any]] = None callbacks: Optional[List[Any]] = None
available_functions: Optional[Dict[str, Any]] = None available_functions: Optional[Dict[str, Any]] = None