Files
crewAI/lib/cli/src/crewai_cli/benchmark.py
alex-clawd 48a861aa1a fix: resolve all CI failures — format, lint, mypy, and review comments
- Format: auto-reformat agent_tui.py, benchmark.py, coworker_tools.py via ruff
- Lint: 0 remaining errors after format pass
- Mypy: fix _NullPrinter to subclass Printer for type compatibility in
  executor.py, planning.py, and skill_builder.py; add isinstance(r, Message)
  guards in spawn_tools.py; annotate return types and fix dict type params
  and MCPToolResolver logger type in new_agent.py; add missing printer args
  to get_llm_response calls
- cli.py: fix _read_config to use sentinel so falsy values (0, false) are
  returned correctly instead of being treated as missing keys
- create_agent.py: replace regex-based JSONC comment stripper with a
  token-aware parser that preserves // inside quoted strings (e.g. URLs)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 12:28:25 -07:00

946 lines
30 KiB
Python

"""Benchmark runner for NewAgent — run agents against test cases and report results."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
import json
from pathlib import Path
import re
import time
from typing import Any
from pydantic import BaseModel
from crewai_cli.create_agent import _strip_jsonc as _strip_jsonc_comments
class BenchmarkCase(BaseModel):
"""A single benchmark test case."""
input: str
expected: str | None = None
criteria: str | None = None
class BenchmarkResult(BaseModel):
"""Result of running a single benchmark case."""
case_index: int
input: str
expected: str | None = None
actual: str = ""
model: str = ""
passed: bool = False
score: float = 0.0
input_tokens: int = 0
output_tokens: int = 0
response_time_ms: int = 0
cost: float | None = None
class LoadedCases:
"""Result of loading benchmark cases — includes optional per-file threshold."""
def __init__(self, cases: list[BenchmarkCase], threshold: float | None = None):
self.cases = cases
self.threshold = threshold
def __len__(self) -> int:
return len(self.cases)
def __iter__(self):
return iter(self.cases)
def __getitem__(self, index):
return self.cases[index]
def load_benchmark_cases(path: str | Path) -> LoadedCases:
"""Load benchmark cases from a JSON or JSONC file.
Accepts either a bare JSON array or an object wrapper::
{"threshold": 0.9, "cases": [...]}
Args:
path: Path to a JSON/JSONC file.
Returns:
LoadedCases with the case list and optional per-file threshold.
Raises:
FileNotFoundError: If the file does not exist.
ValueError: If the file content is invalid.
"""
p = Path(path)
if not p.exists():
raise FileNotFoundError(f"Benchmark cases file not found: {path}")
raw = p.read_text(encoding="utf-8")
clean = _strip_jsonc_comments(raw)
try:
data = json.loads(clean)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in benchmark cases file: {e}") from e
threshold: float | None = None
if isinstance(data, dict):
threshold = data.get("threshold")
if threshold is not None:
threshold = float(threshold)
if "cases" not in data:
raise ValueError("Object-format benchmark file must have a 'cases' array")
data = data["cases"]
if not isinstance(data, list):
raise ValueError("Benchmark cases file must contain a JSON array")
cases: list[BenchmarkCase] = []
for i, item in enumerate(data):
if not isinstance(item, dict):
raise ValueError(f"Benchmark case at index {i} must be a JSON object")
if "input" not in item:
raise ValueError(
f"Benchmark case at index {i} missing required 'input' field"
)
cases.append(BenchmarkCase(**item))
return LoadedCases(cases, threshold)
def _check_expected(expected: str, actual: str) -> tuple[bool, float]:
"""Check if expected output is found in actual (case-insensitive substring match).
Returns:
Tuple of (passed, score).
"""
if expected.lower() in actual.lower():
return True, 1.0
return False, 0.0
async def _judge_with_llm(
criteria: str,
input_text: str,
actual: str,
judge_model: str,
) -> tuple[bool, float]:
"""Use an LLM judge to evaluate a response against criteria.
Returns:
Tuple of (passed, score).
"""
from crewai.utilities.llm_utils import create_llm
judge_llm = create_llm(judge_model)
prompt = (
"You are an evaluation judge. Score the following response on a scale of 0.0 to 1.0.\n\n"
f"Input: {input_text}\n\n"
f"Response: {actual}\n\n"
f"Evaluation criteria: {criteria}\n\n"
"Respond with ONLY a JSON object in this exact format:\n"
'{"score": <float between 0.0 and 1.0>, "passed": <true or false>}\n'
"A score >= 0.7 should be considered passed."
)
try:
response = judge_llm.call(messages=[{"role": "user", "content": prompt}])
text = str(response) if not isinstance(response, str) else response
# Extract JSON from response
match = re.search(r"\{[^}]+\}", text)
if match:
result = json.loads(match.group())
score = float(result.get("score", 0.0))
score = max(0.0, min(1.0, score))
passed = bool(result.get("passed", score >= 0.7))
return passed, score
except Exception:
pass
return False, 0.0
def _parse_definition(source: Any) -> dict[str, Any]:
"""Parse an agent definition — delegates to crewai's parser."""
from crewai.new_agent.definition_parser import parse_agent_definition
return parse_agent_definition(source)
def _load_agent(source: Any, agents_dir: Path | None = None) -> Any:
"""Load a NewAgent from a definition — delegates to crewai's loader."""
from crewai.new_agent.definition_parser import load_agent_from_definition
return load_agent_from_definition(source, agents_dir=agents_dir)
_MAX_CASES_PARALLEL = 4
_CASE_TIMEOUT_SECONDS = 90
async def _run_model_benchmark(
defn: dict[str, Any],
model: str,
cases: list[BenchmarkCase] | LoadedCases,
judge_model: str,
emit: Callable[[dict[str, Any]], None],
agents_dir: Path | None = None,
verbose: bool = False,
) -> list[BenchmarkResult]:
"""Run all benchmark cases for a single model, parallelising up to _MAX_CASES_PARALLEL."""
total = len(cases)
emit({"type": "model_start", "model": model, "total_cases": total})
sem = asyncio.Semaphore(_MAX_CASES_PARALLEL)
async def _run_case(i: int, case: BenchmarkCase) -> BenchmarkResult:
async with sem:
emit(
{
"type": "case_start",
"model": model,
"case_index": i,
"total_cases": total,
"input": case.input,
}
)
bench_defn = dict(defn)
bench_defn["settings"] = dict(defn.get("settings", {}))
if model != "default":
bench_defn["llm"] = model
bench_defn["settings"]["memory"] = False
bench_defn["settings"]["self_improving"] = False
bench_defn["verbose"] = verbose
bench_defn.pop("coworkers", None)
try:
agent = _load_agent(bench_defn, agents_dir=agents_dir)
except Exception as e:
emit(
{
"type": "case_done",
"model": model,
"case_index": i,
"total_cases": total,
"passed": False,
"score": 0.0,
"time_ms": 0,
"error": str(e),
}
)
return BenchmarkResult(
case_index=i,
input=case.input,
expected=case.expected,
actual=f"[Agent creation error: {e}]",
model=model,
passed=False,
score=0.0,
)
start_ms = _current_time_ms()
try:
response = await asyncio.wait_for(
agent.amessage(case.input),
timeout=_CASE_TIMEOUT_SECONDS,
)
elapsed_ms = _current_time_ms() - start_ms
actual = response.content
input_tokens = response.input_tokens or 0
output_tokens = response.output_tokens or 0
cost = response.cost
except asyncio.TimeoutError:
elapsed_ms = _current_time_ms() - start_ms
emit(
{
"type": "case_done",
"model": model,
"case_index": i,
"total_cases": total,
"passed": False,
"score": 0.0,
"time_ms": elapsed_ms,
"error": "timeout",
}
)
return BenchmarkResult(
case_index=i,
input=case.input,
expected=case.expected,
actual=f"[Timeout after {_CASE_TIMEOUT_SECONDS}s]",
model=model,
passed=False,
score=0.0,
response_time_ms=elapsed_ms,
)
except Exception as e:
elapsed_ms = _current_time_ms() - start_ms
emit(
{
"type": "case_done",
"model": model,
"case_index": i,
"total_cases": total,
"passed": False,
"score": 0.0,
"time_ms": elapsed_ms,
"error": str(e),
}
)
return BenchmarkResult(
case_index=i,
input=case.input,
expected=case.expected,
actual=f"[Error: {e}]",
model=model,
passed=False,
score=0.0,
response_time_ms=elapsed_ms,
)
passed, score = False, 0.0
if case.expected is not None:
passed, score = _check_expected(case.expected, actual)
if case.criteria is not None:
emit(
{
"type": "judging",
"model": model,
"case_index": i,
"total_cases": total,
}
)
try:
criteria_passed, criteria_score = await asyncio.wait_for(
_judge_with_llm(case.criteria, case.input, actual, judge_model),
timeout=30,
)
except asyncio.TimeoutError:
criteria_passed, criteria_score = False, 0.0
if case.expected is not None:
passed = passed and criteria_passed
score = (score + criteria_score) / 2.0
else:
passed, score = criteria_passed, criteria_score
emit(
{
"type": "case_done",
"model": model,
"case_index": i,
"total_cases": total,
"passed": passed,
"score": score,
"time_ms": elapsed_ms,
}
)
return BenchmarkResult(
case_index=i,
input=case.input,
expected=case.expected,
actual=actual,
model=model,
passed=passed,
score=score,
input_tokens=input_tokens,
output_tokens=output_tokens,
response_time_ms=elapsed_ms,
cost=cost,
)
model_results = await asyncio.gather(
*[_run_case(i, case) for i, case in enumerate(cases)]
)
total_passed = sum(1 for r in model_results if r.passed)
avg_score = (
sum(r.score for r in model_results) / len(model_results)
if model_results
else 0.0
)
total_time = (
max(r.response_time_ms for r in model_results) / 1000 if model_results else 0.0
)
total_in = sum(r.input_tokens for r in model_results)
total_out = sum(r.output_tokens for r in model_results)
total_cost = sum(r.cost for r in model_results if r.cost is not None)
emit(
{
"type": "model_done",
"model": model,
"passed": total_passed,
"total": len(model_results),
"avg_score": avg_score,
"total_time": total_time,
"input_tokens": total_in,
"output_tokens": total_out,
"total_cost": total_cost if total_cost > 0 else None,
}
)
return model_results
async def run_benchmark(
agent_def: dict[str, Any] | str | Path,
cases: list[BenchmarkCase] | LoadedCases,
models: list[str] | None = None,
judge_model: str = "openai/gpt-4o-mini",
on_progress: Callable[[dict[str, Any]], None] | None = None,
verbose: bool = False,
) -> dict[str, list[BenchmarkResult]]:
"""Run benchmark cases against an agent definition across models in parallel.
Args:
agent_def: Agent definition dict, JSON string, or file path.
cases: List of benchmark cases to run.
models: Optional list of model identifiers to compare. If None, uses agent's default.
judge_model: Model to use for LLM judge evaluation.
on_progress: Optional callback receiving progress dicts with a "type" key.
verbose: When True, enable agent verbose output for debugging.
Returns:
Dict mapping model name to list of BenchmarkResult.
"""
agents_dir: Path | None = None
if isinstance(agent_def, (str, Path)):
p = Path(agent_def)
if p.is_file():
agents_dir = p.parent
defn = _parse_definition(agent_def)
if models is None or len(models) == 0:
models = [defn.get("llm", "default")]
models = list(dict.fromkeys(models))
def _emit(event: dict[str, Any]) -> None:
if on_progress:
on_progress(event)
tasks = [
_run_model_benchmark(
defn,
model,
cases,
judge_model,
_emit,
agents_dir=agents_dir,
verbose=verbose,
)
for model in models
]
all_results = await asyncio.gather(*tasks)
return dict(zip(models, all_results))
class SuppressBenchmarkOutput:
"""Context manager that silences console formatter and noisy logging during benchmarks."""
def __enter__(self):
import logging
self._saved_formatter = None
try:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
listener = TraceCollectionListener._instance
if listener:
self._saved_formatter = listener.formatter
listener.formatter = None
except Exception:
pass
self._loggers = []
for name in (
None,
"crewai.new_agent.event_listener",
"crewai.new_agent.executor",
"crewai",
):
lg = logging.getLogger(name)
self._loggers.append((lg, lg.level))
lg.setLevel(logging.CRITICAL)
return self
def __exit__(self, *exc):
for lg, level in self._loggers:
lg.setLevel(level)
if self._saved_formatter is not None:
try:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
listener = TraceCollectionListener._instance
if listener:
listener.formatter = self._saved_formatter
except Exception:
pass
class VerboseBenchmarkOutput:
"""Context manager that subscribes to NewAgent events and prints them for debugging."""
def __enter__(self):
import logging
import sys
from crewai.events.event_bus import crewai_event_bus
from crewai.new_agent.events import (
NewAgentContextSummarizedEvent,
NewAgentLLMCallCompletedEvent,
NewAgentLLMCallFailedEvent,
NewAgentLLMCallStartedEvent,
NewAgentStatusUpdateEvent,
NewAgentToolUsageCompletedEvent,
NewAgentToolUsageFailedEvent,
NewAgentToolUsageStartedEvent,
)
# Suppress Rich formatter panels — we print our own structured output
self._saved_formatter = None
try:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
listener = TraceCollectionListener._instance
if listener:
self._saved_formatter = listener.formatter
listener.formatter = None
except Exception:
pass
# Quiet loggers to WARNING — keep warnings visible, suppress debug/info spam
self._loggers = []
for name in (
None,
"crewai.new_agent.event_listener",
"crewai.new_agent.executor",
"crewai",
):
lg = logging.getLogger(name)
self._loggers.append((lg, lg.level))
lg.setLevel(logging.WARNING)
self._bus = crewai_event_bus
self._handlers = []
w = sys.stderr.write
fl = sys.stderr.flush
def _on_llm_start(_src, ev: NewAgentLLMCallStartedEvent):
w(f"\033[36m[llm] calling {ev.model}\033[0m\n")
fl()
def _on_llm_done(_src, ev: NewAgentLLMCallCompletedEvent):
w(
f"\033[36m[llm] {ev.model} {ev.input_tokens}{ev.output_tokens} tokens {ev.response_time_ms}ms\033[0m\n"
)
fl()
def _on_llm_fail(_src, ev: NewAgentLLMCallFailedEvent):
w(f"\033[31m[llm] FAILED: {ev.error[:200]}\033[0m\n")
fl()
def _on_tool_start(_src, ev: NewAgentToolUsageStartedEvent):
w(f"\033[33m[tool] using {ev.tool_name}\033[0m\n")
fl()
def _on_tool_done(_src, ev: NewAgentToolUsageCompletedEvent):
w(f"\033[33m[tool] {ev.tool_name} done\033[0m\n")
fl()
def _on_tool_fail(_src, ev: NewAgentToolUsageFailedEvent):
w(f"\033[31m[tool] {ev.tool_name} FAILED: {ev.error[:200]}\033[0m\n")
fl()
def _on_status(_src, ev: NewAgentStatusUpdateEvent):
if ev.detail:
w(f"\033[2m[status] {ev.state}: {ev.detail}\033[0m\n")
fl()
def _on_summarized(_src, ev: NewAgentContextSummarizedEvent):
w("\033[35m[context] summarized — context was too large\033[0m\n")
fl()
pairs = [
(NewAgentLLMCallStartedEvent, _on_llm_start),
(NewAgentLLMCallCompletedEvent, _on_llm_done),
(NewAgentLLMCallFailedEvent, _on_llm_fail),
(NewAgentToolUsageStartedEvent, _on_tool_start),
(NewAgentToolUsageCompletedEvent, _on_tool_done),
(NewAgentToolUsageFailedEvent, _on_tool_fail),
(NewAgentStatusUpdateEvent, _on_status),
(NewAgentContextSummarizedEvent, _on_summarized),
]
for event_type, handler in pairs:
self._bus.on(event_type)(handler)
self._handlers.append((event_type, handler))
return self
def __exit__(self, *exc):
for event_type, handler in self._handlers:
try:
self._bus.off(event_type, handler)
except Exception:
pass
for lg, level in self._loggers:
lg.setLevel(level)
if self._saved_formatter is not None:
try:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
listener = TraceCollectionListener._instance
if listener:
listener.formatter = self._saved_formatter
except Exception:
pass
class ArtifactsSandbox:
"""Context manager that chdirs into tests/artifacts/ for the benchmark run.
Any files created by agent tools land in the artifacts directory instead of
polluting the project root. A .gitignore is written if one doesn't exist.
"""
def __init__(self, base: str | Path = "tests/artifacts"):
self._base = Path(base)
self._prev_cwd: str | None = None
def __enter__(self):
import os
self._base.mkdir(parents=True, exist_ok=True)
gitignore = self._base / ".gitignore"
if not gitignore.exists():
gitignore.write_text("*\n")
self._prev_cwd = os.getcwd()
os.chdir(self._base)
return self
def __exit__(self, *exc):
import os
if self._prev_cwd:
os.chdir(self._prev_cwd)
def _current_time_ms() -> int:
"""Return current time in milliseconds."""
return int(time.monotonic() * 1000)
def format_results_table(results: list[BenchmarkResult]) -> str:
"""Format benchmark results as a readable table.
Args:
results: List of BenchmarkResult for a single model.
Returns:
Formatted string table.
"""
if not results:
return "No results to display."
model = results[0].model
lines: list[str] = []
lines.append(f"Benchmark Results — Model: {model}")
lines.append("=" * 80)
header = f"{'#':<4} {'Pass':<6} {'Score':<7} {'Tokens':<12} {'Time (ms)':<10} {'Input (truncated)'}"
lines.append(header)
lines.append("-" * 80)
total_passed = 0
total_score = 0.0
total_input_tokens = 0
total_output_tokens = 0
total_time_ms = 0
for r in results:
status = "PASS" if r.passed else "FAIL"
tokens = f"{r.input_tokens}/{r.output_tokens}"
input_trunc = r.input[:40] + "..." if len(r.input) > 40 else r.input
line = f"{r.case_index:<4} {status:<6} {r.score:<7.2f} {tokens:<12} {r.response_time_ms:<10} {input_trunc}"
lines.append(line)
if r.passed:
total_passed += 1
total_score += r.score
total_input_tokens += r.input_tokens
total_output_tokens += r.output_tokens
total_time_ms += r.response_time_ms
lines.append("-" * 80)
n = len(results)
avg_score = total_score / n if n > 0 else 0.0
lines.append(
f"Total: {total_passed}/{n} passed | Avg score: {avg_score:.2f} | "
f"Tokens: {total_input_tokens}/{total_output_tokens} | "
f"Total time: {total_time_ms}ms"
)
return "\n".join(lines)
def format_comparison_table(results_by_model: dict[str, list[BenchmarkResult]]) -> str:
"""Format a comparison table across multiple models.
Args:
results_by_model: Dict mapping model name to list of BenchmarkResult.
Returns:
Formatted comparison string.
"""
if not results_by_model:
return "No results to compare."
lines: list[str] = []
lines.append("Model Comparison")
lines.append("=" * 90)
header = f"{'Model':<30} {'Passed':<10} {'Avg Score':<12} {'In Tokens':<12} {'Out Tokens':<12} {'Time (ms)'}"
lines.append(header)
lines.append("-" * 90)
for model, results in results_by_model.items():
n = len(results)
passed = sum(1 for r in results if r.passed)
avg_score = sum(r.score for r in results) / n if n > 0 else 0.0
total_in = sum(r.input_tokens for r in results)
total_out = sum(r.output_tokens for r in results)
total_time = max((r.response_time_ms for r in results), default=0)
model_trunc = model[:28] if len(model) > 28 else model
line = (
f"{model_trunc:<30} {passed}/{n:<8} {avg_score:<12.2f} "
f"{total_in:<12} {total_out:<12} {total_time}"
)
lines.append(line)
lines.append("-" * 90)
# Determine best model by average score
if results_by_model:
best_model = max(
results_by_model.keys(),
key=lambda m: (
sum(r.score for r in results_by_model[m]) / len(results_by_model[m])
if results_by_model[m]
else 0.0
),
)
best_score = (
sum(r.score for r in results_by_model[best_model])
/ len(results_by_model[best_model])
if results_by_model[best_model]
else 0.0
)
lines.append(f"Best model: {best_model} (avg score: {best_score:.2f})")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Rich-based terminal charts
# ---------------------------------------------------------------------------
def _score_color(score: float) -> str:
if score >= 0.7:
return "green"
if score >= 0.4:
return "yellow"
return "red"
def _score_bar(score: float, width: int = 20) -> str:
clamped = max(0.0, min(1.0, score))
filled = round(clamped * width)
empty = width - filled
color = _score_color(score)
bar = f"[{color}]{'' * filled}[/{color}]"
if empty:
bar += f"[dim]{'' * empty}[/dim]"
return bar
def _fmt_tokens(n: int) -> str:
if n >= 1_000_000:
return f"{n / 1_000_000:.1f}M"
if n >= 1_000:
return f"{n / 1_000:.1f}k"
return str(n)
def _fmt_cost(cost: float | None) -> str:
if cost is None:
return ""
if cost < 0.01:
return f"${cost:.4f}"
return f"${cost:.2f}"
def print_results_chart(
results: list[BenchmarkResult],
console: Any | None = None,
) -> None:
from rich.console import Console
from rich.panel import Panel
if not console:
console = Console()
if not results:
console.print("[dim]No results to display.[/]")
return
model = results[0].model
has_cost = any(r.cost is not None for r in results)
bar_w = 10
input_w = 35
rows: list[str] = []
for r in results:
inp = r.input[: input_w - 1] + "" if len(r.input) >= input_w else r.input
inp_pad = inp + " " * max(0, input_w - len(inp))
bar = _score_bar(r.score, bar_w)
badge = "[green]PASS[/green]" if r.passed else "[red]FAIL[/red]"
time_s = f"{r.response_time_ms / 1000:>5.1f}s"
cost_part = f" [dim]{_fmt_cost(r.cost):>7}[/dim]" if has_cost else ""
rows.append(
f" [dim]{r.case_index:>2}[/dim] {inp_pad} {bar} {r.score:.2f} {badge} [dim]{time_s}[/dim]{cost_part}"
)
n = len(results)
passed = sum(1 for r in results if r.passed)
avg = sum(r.score for r in results) / n
total_time = sum(r.response_time_ms for r in results) / 1000
total_in = sum(r.input_tokens for r in results)
total_out = sum(r.output_tokens for r in results)
total_cost = sum(r.cost for r in results if r.cost is not None)
color = _score_color(avg)
summary_parts = [
f"[{color}]{passed}/{n} passed[/{color}]",
f"avg [{color}]{avg:.2f}[/{color}]",
f"[dim]{total_time:.1f}s[/dim]",
f"[dim]↑{_fmt_tokens(total_in)}{_fmt_tokens(total_out)}[/dim]",
]
if total_cost > 0:
summary_parts.append(f"[dim]{_fmt_cost(total_cost)}[/dim]")
body = "\n".join(rows) + "\n\n " + " · ".join(summary_parts)
panel = Panel(
body,
title=f"[bold cyan]{model}[/bold cyan]",
title_align="left",
border_style="dim",
padding=(0, 1),
expand=False,
)
console.print(panel)
def print_comparison_chart(
results_by_model: dict[str, list[BenchmarkResult]],
console: Any | None = None,
) -> None:
from rich.console import Console
from rich.panel import Panel
if not console:
console = Console()
if not results_by_model:
console.print("[dim]No results to compare.[/dim]")
return
inner_w = max(console.width - 4, 60)
models_data: list[dict[str, Any]] = []
max_time = 0.0
max_tokens = 0
for model, results in results_by_model.items():
n = len(results)
passed = sum(1 for r in results if r.passed)
avg = sum(r.score for r in results) / n if n else 0.0
total_time = max((r.response_time_ms for r in results), default=0) / 1000
total_tokens = sum(r.input_tokens + r.output_tokens for r in results)
models_data.append(
{
"model": model,
"passed": passed,
"n": n,
"avg": avg,
"time": total_time,
"tokens": total_tokens,
}
)
max_time = max(max_time, total_time)
max_tokens = max(max_tokens, total_tokens)
for md in models_data:
time_score = 1.0 - (md["time"] / max_time) if max_time > 0 else 0.0
token_score = 1.0 - (md["tokens"] / max_tokens) if max_tokens > 0 else 0.0
md["rank"] = md["avg"] * 0.6 + time_score * 0.25 + token_score * 0.15
best = max(models_data, key=lambda m: m["rank"]) if len(models_data) > 1 else None
max_name_len = min(max(len(m["model"]) for m in models_data), 28)
fixed_right = 1 + 4 + 2 + 5 + 2 + 6 + 2 + 8 + 4
bar_width = max(12, inner_w - max_name_len - fixed_right - 4)
bar_width = min(bar_width, 30)
lines: list[str] = []
for md in models_data:
name_raw = md["model"]
name = (
name_raw[: max_name_len - 1] + ""
if len(name_raw) > max_name_len
else name_raw
).ljust(max_name_len)
bar = _score_bar(md["avg"], bar_width)
pass_color = _score_color(md["avg"])
star = (
" [bold green]★[/bold green]"
if best and md["model"] == best["model"]
else ""
)
tokens_str = _fmt_tokens(md["tokens"])
lines.append(
f" {name} {bar} {md['avg']:.2f} "
f"[{pass_color}]{md['passed']}/{md['n']}[/{pass_color}] "
f"[dim]{md['time']:>5.1f}s[/dim] "
f"[dim]{tokens_str:>6}[/dim]"
f"{star}"
)
body = "\n".join(lines)
panel = Panel(
body,
title="[bold]Model Comparison[/bold]",
subtitle="[dim]★ = best (60% score · 25% speed · 15% tokens)[/dim]",
subtitle_align="left",
title_align="left",
border_style="dim",
padding=(1, 1),
expand=False,
)
console.print(panel)