feat: add verbose output option for benchmarking and testing

- Introduced a `verbose` flag in the CLI for the `test` and `benchmark` commands to enable detailed logging of agent execution, including tool calls and LLM responses.
- Updated the `_run_model_benchmark` and `_test_new_agents` functions to accept the `verbose` parameter, allowing for enhanced debugging during benchmark runs.
- Implemented a `verbose_benchmark_output` context manager to manage logging output when verbose mode is enabled, improving the visibility of agent interactions.

These changes enhance the debugging capabilities of the CrewAI framework, providing users with more insights during testing and benchmarking processes.
This commit is contained in:
Joao Moura
2026-05-13 03:51:30 -04:00
committed by alex-clawd
parent c33fd82286
commit 0ddedbc48a
2 changed files with 167 additions and 28 deletions

View File

@@ -192,6 +192,7 @@ async def _run_model_benchmark(
judge_model: str,
emit: Callable[[dict[str, Any]], None],
agents_dir: Path | None = None,
verbose: bool = False,
) -> list[BenchmarkResult]:
"""Run all benchmark cases for a single model, parallelising up to _MAX_CASES_PARALLEL."""
total = len(cases)
@@ -209,12 +210,8 @@ async def _run_model_benchmark(
bench_defn["llm"] = model
bench_defn["settings"]["memory"] = False
bench_defn["settings"]["self_improving"] = False
bench_defn["settings"]["planning"] = False
bench_defn["verbose"] = False
bench_defn["max_iter"] = min(bench_defn.get("max_iter", 25), 5)
bench_defn["max_execution_time"] = min(bench_defn.get("max_execution_time", 60), 60)
bench_defn["verbose"] = verbose
bench_defn.pop("coworkers", None)
bench_defn.pop("tools", None)
try:
agent = _load_agent(bench_defn, agents_dir=agents_dir)
@@ -304,6 +301,7 @@ async def run_benchmark(
models: list[str] | None = None,
judge_model: str = "openai/gpt-4o-mini",
on_progress: Callable[[dict[str, Any]], None] | None = None,
verbose: bool = False,
) -> dict[str, list[BenchmarkResult]]:
"""Run benchmark cases against an agent definition across models in parallel.
@@ -313,6 +311,7 @@ async def run_benchmark(
models: Optional list of model identifiers to compare. If None, uses agent's default.
judge_model: Model to use for LLM judge evaluation.
on_progress: Optional callback receiving progress dicts with a "type" key.
verbose: When True, enable agent verbose output for debugging.
Returns:
Dict mapping model name to list of BenchmarkResult.
@@ -333,7 +332,7 @@ async def run_benchmark(
on_progress(event)
tasks = [
_run_model_benchmark(defn, model, cases, judge_model, _emit, agents_dir=agents_dir)
_run_model_benchmark(defn, model, cases, judge_model, _emit, agents_dir=agents_dir, verbose=verbose)
for model in models
]
all_results = await asyncio.gather(*tasks)
@@ -379,6 +378,105 @@ class SuppressBenchmarkOutput:
pass
class VerboseBenchmarkOutput:
"""Context manager that subscribes to NewAgent events and prints them for debugging."""
def __enter__(self):
import logging
import sys
from crewai.events.event_bus import crewai_event_bus
from crewai.new_agent.events import (
NewAgentLLMCallStartedEvent,
NewAgentLLMCallCompletedEvent,
NewAgentLLMCallFailedEvent,
NewAgentToolUsageStartedEvent,
NewAgentToolUsageCompletedEvent,
NewAgentToolUsageFailedEvent,
NewAgentStatusUpdateEvent,
NewAgentContextSummarizedEvent,
)
# Suppress Rich formatter panels — we print our own structured output
self._saved_formatter = None
try:
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
listener = TraceCollectionListener._instance
if listener:
self._saved_formatter = listener.formatter
listener.formatter = None
except Exception:
pass
# Quiet loggers to WARNING — keep warnings visible, suppress debug/info spam
self._loggers = []
for name in (None, "crewai.new_agent.event_listener", "crewai.new_agent.executor", "crewai"):
lg = logging.getLogger(name)
self._loggers.append((lg, lg.level))
lg.setLevel(logging.WARNING)
self._bus = crewai_event_bus
self._handlers = []
w = sys.stderr.write
fl = sys.stderr.flush
def _on_llm_start(_src, ev: NewAgentLLMCallStartedEvent):
w(f"\033[36m[llm] calling {ev.model}\033[0m\n"); fl()
def _on_llm_done(_src, ev: NewAgentLLMCallCompletedEvent):
w(f"\033[36m[llm] {ev.model} {ev.input_tokens}{ev.output_tokens} tokens {ev.response_time_ms}ms\033[0m\n"); fl()
def _on_llm_fail(_src, ev: NewAgentLLMCallFailedEvent):
w(f"\033[31m[llm] FAILED: {ev.error[:200]}\033[0m\n"); fl()
def _on_tool_start(_src, ev: NewAgentToolUsageStartedEvent):
w(f"\033[33m[tool] using {ev.tool_name}\033[0m\n"); fl()
def _on_tool_done(_src, ev: NewAgentToolUsageCompletedEvent):
w(f"\033[33m[tool] {ev.tool_name} done\033[0m\n"); fl()
def _on_tool_fail(_src, ev: NewAgentToolUsageFailedEvent):
w(f"\033[31m[tool] {ev.tool_name} FAILED: {ev.error[:200]}\033[0m\n"); fl()
def _on_status(_src, ev: NewAgentStatusUpdateEvent):
if ev.detail:
w(f"\033[2m[status] {ev.state}: {ev.detail}\033[0m\n"); fl()
def _on_summarized(_src, ev: NewAgentContextSummarizedEvent):
w(f"\033[35m[context] summarized — context was too large\033[0m\n"); fl()
pairs = [
(NewAgentLLMCallStartedEvent, _on_llm_start),
(NewAgentLLMCallCompletedEvent, _on_llm_done),
(NewAgentLLMCallFailedEvent, _on_llm_fail),
(NewAgentToolUsageStartedEvent, _on_tool_start),
(NewAgentToolUsageCompletedEvent, _on_tool_done),
(NewAgentToolUsageFailedEvent, _on_tool_fail),
(NewAgentStatusUpdateEvent, _on_status),
(NewAgentContextSummarizedEvent, _on_summarized),
]
for event_type, handler in pairs:
self._bus.on(event_type)(handler)
self._handlers.append((event_type, handler))
return self
def __exit__(self, *exc):
for event_type, handler in self._handlers:
try:
self._bus.off(event_type, handler)
except Exception:
pass
for lg, level in self._loggers:
lg.setLevel(level)
if self._saved_formatter is not None:
try:
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
listener = TraceCollectionListener._instance
if listener:
listener.formatter = self._saved_formatter
except Exception:
pass
class ArtifactsSandbox:
"""Context manager that chdirs into tests/artifacts/ for the benchmark run.

View File

@@ -511,12 +511,18 @@ def memory(
help="LLM model for evaluation judging (NewAgent only). "
"Defaults to test.judge_model in config.json (openai/gpt-4o-mini if not set).",
)
@click.option(
"-v", "--verbose",
is_flag=True,
help="Show agent execution details (tool calls, LLM responses, errors).",
)
def test(
n_iterations: int,
model: str | None,
trained_agents_file: str | None,
threshold: float | None,
judge_model: str | None,
verbose: bool,
) -> None:
"""Test the crew or agents and evaluate the results.
@@ -541,6 +547,8 @@ def test(
uv_args.extend(["-m", model])
if trained_agents_file:
uv_args.extend(["-f", trained_agents_file])
if verbose:
uv_args.append("-v")
_relaunch_via_uv(uv_args)
config_threshold = _read_config("test", "threshold")
@@ -548,7 +556,7 @@ def test(
config_threshold = _read_config("test_threshold")
effective_threshold = threshold if threshold is not None else (float(config_threshold) if config_threshold is not None else 0.7)
_test_new_agents(agent_files, n_iterations, model, effective_threshold, effective_judge)
_test_new_agents(agent_files, n_iterations, model, effective_threshold, effective_judge, verbose=verbose)
else:
crew_model = model or "gpt-4o-mini"
click.echo(f"Testing the crew for {n_iterations} iterations with model {crew_model}")
@@ -706,6 +714,7 @@ def _test_new_agents(
model: str | None,
threshold: float,
judge_model: str,
verbose: bool = False,
) -> None:
"""Run NewAgent test cases with pass/fail threshold (all agents in parallel)."""
import asyncio
@@ -754,7 +763,7 @@ def _test_new_agents(
model_list = [model] if model else None
# Progress display — prefix model key with agent name
progress = _BenchmarkLiveProgress(console=_con)
progress = None if verbose else _BenchmarkLiveProgress(console=_con)
def _make_progress_cb(agent_name: str):
def _cb(event: dict) -> None:
@@ -773,7 +782,8 @@ def _test_new_agents(
cases=job["cases"],
models=model_list,
judge_model=judge_model,
on_progress=_make_progress_cb(job["agent_name"]),
on_progress=None if verbose else _make_progress_cb(job["agent_name"]),
verbose=verbose,
)
)
return await asyncio.gather(*tasks, return_exceptions=True)
@@ -785,14 +795,21 @@ def _test_new_agents(
fg="cyan", bold=True,
)
from crewai_cli.benchmark import ArtifactsSandbox, SuppressBenchmarkOutput
from crewai_cli.benchmark import ArtifactsSandbox, SuppressBenchmarkOutput, VerboseBenchmarkOutput
progress.start()
if not verbose:
progress.start()
try:
with ArtifactsSandbox(), SuppressBenchmarkOutput():
all_results = asyncio.run(_run_all())
with ArtifactsSandbox():
if verbose:
with VerboseBenchmarkOutput():
all_results = asyncio.run(_run_all())
else:
with SuppressBenchmarkOutput():
all_results = asyncio.run(_run_all())
finally:
progress.stop()
if not verbose:
progress.stop()
# Evaluate results
all_passed = True
@@ -1565,11 +1582,17 @@ def checkpoint_prune(
help="Model for LLM judge evaluation. "
"Defaults to test.judge_model in config.json (openai/gpt-4o-mini if not set).",
)
@click.option(
"-v", "--verbose",
is_flag=True,
help="Show agent execution details (tool calls, LLM responses, errors).",
)
def benchmark(
agent_path: str,
cases_path: str,
models: tuple[str, ...],
judge_model: str | None,
verbose: bool,
) -> None:
"""Run agent against test cases and report results."""
import asyncio
@@ -1582,6 +1605,8 @@ def benchmark(
uv_args = ["benchmark", agent_path, cases_path, "--judge-model", judge_model]
for m in models:
uv_args.extend(["-m", m])
if verbose:
uv_args.append("-v")
_relaunch_via_uv(uv_args)
from rich.console import Console as _RichConsole
@@ -1613,26 +1638,42 @@ def benchmark(
click.echo(f"Judge model: {judge_model}")
click.echo()
from crewai_cli.benchmark import ArtifactsSandbox, SuppressBenchmarkOutput
from crewai_cli.benchmark import ArtifactsSandbox, SuppressBenchmarkOutput, VerboseBenchmarkOutput
progress = _BenchmarkLiveProgress(console=_con)
progress.start()
progress = None if verbose else _BenchmarkLiveProgress(console=_con)
if progress:
progress.start()
try:
with ArtifactsSandbox(), SuppressBenchmarkOutput():
results_by_model = asyncio.run(
run_benchmark(
agent_def=agent_path,
cases=cases,
models=model_list,
judge_model=judge_model,
on_progress=progress.on_progress,
)
)
with ArtifactsSandbox():
if verbose:
with VerboseBenchmarkOutput():
results_by_model = asyncio.run(
run_benchmark(
agent_def=agent_path,
cases=cases,
models=model_list,
judge_model=judge_model,
verbose=verbose,
)
)
else:
with SuppressBenchmarkOutput():
results_by_model = asyncio.run(
run_benchmark(
agent_def=agent_path,
cases=cases,
models=model_list,
judge_model=judge_model,
on_progress=progress.on_progress if progress else None,
verbose=verbose,
)
)
except Exception as e:
click.secho(f"Error running benchmark: {e}", fg="red")
raise SystemExit(1) from e
finally:
progress.stop()
if progress:
progress.stop()
if len(results_by_model) > 1:
_con.print()