feat: introduce room management and agent selection in TUI

- Added a `CreateRoomScreen` modal for creating new rooms with agent selection and engagement options.
- Updated the main TUI layout to include a sidebar for room management, allowing users to create and switch between rooms.
- Enhanced the configuration handling to support room definitions and engagement modes.
- Refactored existing code to accommodate new room functionalities and improve overall structure.

These changes enhance the user experience by enabling better organization and interaction with multiple agents in the CrewAI framework.
This commit is contained in:
Joao Moura
2026-05-13 02:52:03 -04:00
committed by alex-clawd
parent fc85637e60
commit 75651f962d
7 changed files with 769 additions and 280 deletions

View File

@@ -21,12 +21,17 @@ from typing import Any
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.screen import ModalScreen
from textual.widgets import (
Button,
Checkbox,
Footer,
Header,
Input,
Label,
OptionList,
RadioButton,
RadioSet,
Static,
TabbedContent,
TabPane,
@@ -66,7 +71,7 @@ _BG_PANEL = "#222222"
_BG_MSG_USER = "#2a2a2a"
_BG_MSG_AGENT = "#252525"
_DIM = "#777777"
_COMMON_ROOM = "__common__"
_ROOM_PREFIX = "__room__"
_SPINNER = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
@@ -202,6 +207,79 @@ class ThinkingIndicator(Static):
self.update("\n".join(lines))
class CreateRoomScreen(ModalScreen[dict[str, Any] | None]):
"""Modal form for creating a new room."""
CSS = f"""
CreateRoomScreen {{
align: center middle;
}}
#room-form {{
width: 56;
max-height: 80%;
background: {_BG_PANEL};
border: tall {_TEAL};
padding: 1 2;
}}
#room-form Label {{
margin: 1 0 0 0;
color: {_DIM};
}}
#room-form Input {{
margin: 0 0 1 0;
}}
#room-form .form-section {{
height: auto;
margin: 0 0 1 0;
}}
#room-form RadioSet {{
margin: 0;
}}
#room-form Button {{
margin: 1 1 0 0;
}}
"""
def __init__(self, agent_names: list[str], **kwargs: Any) -> None:
super().__init__(**kwargs)
self._agent_names = agent_names
def compose(self) -> ComposeResult:
with Vertical(id="room-form"):
yield Label(f"[bold {_CORAL}]Create Room[/]")
yield Label("Name")
yield Input(placeholder="e.g. engineering", id="room-name-input")
yield Label("Agents")
with Vertical(classes="form-section"):
for name in self._agent_names:
yield Checkbox(name, value=True, id=f"cb-{name}")
yield Label("Engagement")
with RadioSet(id="engagement-radio"):
yield RadioButton("Organic — agents auto-respond", value=True, id="radio-organic")
yield RadioButton("Tagged — @mention required", id="radio-tagged")
with Horizontal():
yield Button("Create", variant="primary", id="btn-create-room")
yield Button("Cancel", id="btn-cancel-room")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-cancel-room":
self.dismiss(None)
return
if event.button.id == "btn-create-room":
name_input = self.query_one("#room-name-input", Input)
name = name_input.value.strip().lower().replace(" ", "-")
if not name:
name_input.focus()
return
agents = [
n for n in self._agent_names
if self.query_one(f"#cb-{n}", Checkbox).value
]
radio = self.query_one("#engagement-radio", RadioSet)
engagement = "organic" if radio.pressed_index == 0 else "tagged"
self.dismiss({"name": name, "agents": agents, "engagement": engagement})
# ── Main TUI ──────────────────────────────────────────────────
@@ -273,6 +351,38 @@ class AgentTUI(App[None]):
color: {_TEAL};
background: {_BG};
}}
.sidebar-label {{
padding: 1 1 0 1;
color: {_DIM};
text-style: bold;
height: auto;
}}
#room-list {{
height: auto;
max-height: 40%;
padding: 0 1;
background: {_BG_PANEL};
}}
#room-list > .option-list--option-highlighted {{
background: {_TEAL};
color: white;
}}
#room-list > .option-list--option {{
padding: 0 1;
}}
#btn-new-room {{
margin: 0 1 1 1;
width: 100%;
background: {_BG};
color: {_TEAL};
border: tall #333333;
min-height: 1;
height: 3;
}}
#btn-new-room:hover {{
background: {_TEAL};
color: white;
}}
#agent-list {{
height: 1fr;
padding: 0 1;
@@ -374,16 +484,18 @@ class AgentTUI(App[None]):
super().__init__(**kwargs)
self._agents_dir = agents_dir
self._config = config or {}
self._config_path = Path.cwd() / "config.json"
self._agent_defs: list[dict[str, Any]] = []
self._agent_names: list[str] = []
self._agent_instances: dict[str, Any] = {}
self._current_room: str = _COMMON_ROOM
# Rooms: {room_key: {"name": display_name, "agents": [...], "engagement": "organic"|"tagged"}}
self._rooms: dict[str, dict[str, Any]] = {}
self._current_room: str = ""
# (sender, content, metadata) tuples keyed by room
self._chat_histories: dict[str, list[tuple[str, str, str]]] = {}
self._processing = False
self._last_active_agent: str | None = None
self._last_agent_error: str = ""
self._engagement_mode: str = "organic"
self._scheduler: Any = None
def compose(self) -> ComposeResult:
@@ -391,7 +503,11 @@ class AgentTUI(App[None]):
with Horizontal(id="main-layout"):
with Vertical(id="sidebar"):
with TabbedContent(id="sidebar-tabs"):
with TabPane("Agents", id="tab-agents"):
with TabPane("Chat", id="tab-agents"):
yield Static("ROOMS", classes="sidebar-label")
yield OptionList(id="room-list")
yield Button("+ New Room", id="btn-new-room", variant="default")
yield Static("AGENTS", classes="sidebar-label")
yield OptionList(id="agent-list")
with TabPane("Memory", id="tab-memory"):
yield Static("Click below to open the memory browser.", id="memory-scope-label")
@@ -407,15 +523,49 @@ class AgentTUI(App[None]):
)
yield Footer()
def _room_key(self, name: str) -> str:
return f"{_ROOM_PREFIX}{name}"
def _is_room(self, key: str) -> bool:
return key.startswith(_ROOM_PREFIX)
def _room_engagement(self, room_key: str) -> str:
if room_key in self._rooms:
return self._rooms[room_key].get("engagement", "organic")
return "organic"
def _room_agents(self, room_key: str) -> list[str]:
if room_key in self._rooms:
return self._rooms[room_key].get("agents", self._agent_names[:])
return self._agent_names[:]
def on_mount(self) -> None:
self._agent_defs = _load_agents(self._agents_dir)
self._agent_names = [
d.get("name", d.get("role", "unnamed")) for d in self._agent_defs
]
rooms = self._config.get("rooms", {})
common_room = rooms.get("common", {})
self._engagement_mode = common_room.get("engagement", "organic")
# Load rooms from config
rooms_cfg = self._config.get("rooms", {})
for room_name, room_data in rooms_cfg.items():
key = self._room_key(room_name)
cfg_agents = room_data.get("agents", [])
self._rooms[key] = {
"name": room_name,
"agents": cfg_agents if cfg_agents else self._agent_names[:],
"engagement": room_data.get("engagement", "organic"),
}
# Ensure at least "common" room exists
common_key = self._room_key("common")
if common_key not in self._rooms:
self._rooms[common_key] = {
"name": "common",
"agents": self._agent_names[:],
"engagement": "organic",
}
self._current_room = common_key
# Subscribe to status update events from the executor
self._status_listener = None
@@ -439,22 +589,26 @@ class AgentTUI(App[None]):
except Exception:
pass
chat_input = self.query_one("#chat-input", Input)
if self._engagement_mode == "organic":
chat_input.placeholder = "Type a message — agents will respond automatically"
if AgentSuggester is not None and self._agent_names:
self.query_one("#chat-input", Input).suggester = AgentSuggester(
self._agent_names
)
agent_list = self.query_one("#agent-list", OptionList)
if not self._agent_defs:
self._mount_sys("No agents found. Run: crewai create agent <name>")
return
agent_list.add_option("◆ Common Room")
# Populate rooms list
room_list = self.query_one("#room-list", OptionList)
for key in self._rooms:
display = self._rooms[key]["name"].replace("-", " ").title()
engagement = self._rooms[key]["engagement"]
n_agents = len(self._rooms[key]["agents"])
room_list.add_option(f"{display} [{_DIM}]{engagement} · {n_agents}[/]")
room_list.highlighted = 0
# Populate agents list (DM entries)
agent_list = self.query_one("#agent-list", OptionList)
for defn in self._agent_defs:
name = defn.get("name", "unnamed")
role = defn.get("role", "")
@@ -464,8 +618,8 @@ class AgentTUI(App[None]):
label += f" · {trunc}"
agent_list.add_option(label)
agent_list.highlighted = 0
self._update_subtitle()
self._update_placeholder()
self._load_history_from_disk()
self._render_chat()
self.query_one("#chat-input", Input).focus()
@@ -478,6 +632,25 @@ class AgentTUI(App[None]):
except Exception:
pass
def _update_subtitle(self) -> None:
if self._is_room(self._current_room):
info = self._rooms.get(self._current_room, {})
display = info.get("name", "room").replace("-", " ").title()
self.sub_title = display
else:
self.sub_title = f"Chat with {self._current_room}"
def _update_placeholder(self) -> None:
chat_input = self.query_one("#chat-input", Input)
if self._is_room(self._current_room):
engagement = self._room_engagement(self._current_room)
if engagement == "organic":
chat_input.placeholder = "Type a message — agents will respond automatically"
else:
chat_input.placeholder = "Use @agent_name to direct your message"
else:
chat_input.placeholder = f"Message {self._current_room}"
def _on_scheduled_task_due(self, task: Any) -> str:
"""Callback fired by the scheduler when a task comes due."""
agent_name = getattr(task, "agent_name", "")
@@ -511,17 +684,31 @@ class AgentTUI(App[None]):
def on_option_list_option_highlighted(
self, event: OptionList.OptionHighlighted
) -> None:
if event.option_list.id != "agent-list":
return
idx = event.option_index
if idx == 0:
self._current_room = _COMMON_ROOM
self.sub_title = "Common Room"
elif 1 <= idx <= len(self._agent_names):
name = self._agent_names[idx - 1]
self._current_room = name
self.sub_title = f"Chat with {name}"
self._render_chat()
if event.option_list.id == "room-list":
room_keys = list(self._rooms.keys())
idx = event.option_index
if 0 <= idx < len(room_keys):
self._current_room = room_keys[idx]
# Deselect agent list
try:
self.query_one("#agent-list", OptionList).highlighted = None
except Exception:
pass
self._update_subtitle()
self._update_placeholder()
self._render_chat()
elif event.option_list.id == "agent-list":
idx = event.option_index
if 0 <= idx < len(self._agent_names):
self._current_room = self._agent_names[idx]
# Deselect room list
try:
self.query_one("#room-list", OptionList).highlighted = None
except Exception:
pass
self._update_subtitle()
self._update_placeholder()
self._render_chat()
# ── Message routing ──
@@ -548,15 +735,21 @@ class AgentTUI(App[None]):
self._append_msg(room, "You", text)
self._mount_bubble("You", text)
if not targets and self._current_room == _COMMON_ROOM:
# Route to agent with pending suggestion before organic scoring
if not targets and self._is_room(self._current_room):
room_agent_names = self._room_agents(self._current_room)
room_agent_defs = [
d for d in self._agent_defs
if d.get("name", d.get("role", "unnamed")) in room_agent_names
]
engagement = self._room_engagement(self._current_room)
pending_agent = self._find_agent_with_pending_suggestion()
if pending_agent:
if pending_agent and pending_agent in room_agent_names:
targets = [pending_agent]
elif self._engagement_mode == "organic":
scored = await self._score_relevance_llm(clean_text, self._agent_defs)
elif engagement == "organic":
scored = await self._score_relevance_llm(clean_text, room_agent_defs)
if scored is None:
scored = self._score_relevance(clean_text, self._agent_defs)
scored = self._score_relevance(clean_text, room_agent_defs)
if scored:
top_score = scored[0][1]
best = [scored[0][0]]
@@ -569,13 +762,14 @@ class AgentTUI(App[None]):
d.get("name", d.get("role", "unnamed")) for d in best
]
else:
targets = [self._last_active_agent or self._agent_names[0]]
elif len(self._agent_names) == 1:
targets = [self._agent_names[0]]
targets = [self._last_active_agent or room_agent_names[0]]
elif len(room_agent_names) == 1:
targets = [room_agent_names[0]]
else:
first = room_agent_names[0] if room_agent_names else "agent"
self._mount_sys(
"Tip: use @agent_name to direct your message, "
f"e.g. @{self._agent_names[0]}"
f"e.g. @{first}"
)
return
@@ -748,7 +942,7 @@ class AgentTUI(App[None]):
def _handle_skills_command(self) -> None:
"""List active skills for the current agent."""
agent = None
if self._current_room != _COMMON_ROOM:
if not self._is_room(self._current_room):
agent = self._get_or_create_agent(self._current_room)
elif self._last_active_agent:
agent = self._get_or_create_agent(self._last_active_agent)
@@ -833,7 +1027,7 @@ class AgentTUI(App[None]):
def _get_focused_agent(self) -> Any:
"""Return the currently focused agent instance, or None."""
if self._current_room != _COMMON_ROOM:
if not self._is_room(self._current_room):
return self._get_or_create_agent(self._current_room)
if self._last_active_agent:
return self._get_or_create_agent(self._last_active_agent)
@@ -966,9 +1160,9 @@ class AgentTUI(App[None]):
"""Parse all @mentions in the message.
Returns ``([agent_names], clean_text)``.
In the Common Room, at least one @mention is required — untagged
messages return ``([], text)`` so the caller can prompt.
In a DM room, messages always route to that room's agent.
In a room, at least one @mention is required for tagged mode —
untagged messages return ``([], text)`` so the caller can handle routing.
In a DM (agent name), messages always route to that agent.
"""
found: list[str] = []
clean = text
@@ -979,7 +1173,7 @@ class AgentTUI(App[None]):
clean = pattern.sub("", clean).strip()
if found:
return found, clean
if self._current_room != _COMMON_ROOM:
if not self._is_room(self._current_room):
return [self._current_room], text
return [], text
@@ -992,7 +1186,7 @@ class AgentTUI(App[None]):
"""Process a message directed at multiple agents in parallel."""
# Build room context once (shared snapshot before any replies)
room_context: str | None = None
if room == _COMMON_ROOM:
if self._is_room(room):
ctx = self._build_room_context(room)
if ctx:
room_context = (
@@ -1109,10 +1303,8 @@ class AgentTUI(App[None]):
self._mount_sys(msg)
return
# In the Common Room, prepend conversation context so the
# tagged agent can see what was discussed before.
message_text = text
if room == _COMMON_ROOM:
if self._is_room(room):
ctx = self._build_room_context(room)
if ctx:
message_text = (
@@ -1270,17 +1462,21 @@ class AgentTUI(App[None]):
history = self._chat_histories.get(self._current_room, [])
if not history:
if self._current_room == _COMMON_ROOM:
names = ", ".join(self._agent_names[:5])
if self._engagement_mode == "organic":
if self._is_room(self._current_room):
room_info = self._rooms.get(self._current_room, {})
display = room_info.get("name", "room").replace("-", " ").title()
room_agents = self._room_agents(self._current_room)
names = ", ".join(room_agents[:5])
engagement = self._room_engagement(self._current_room)
if engagement == "organic":
self._mount_sys(
f"Welcome to the Common Room. "
f"Welcome to {display}. "
f"Just type — relevant agents will respond. "
f"Use @agent_name to direct a message. Available: {names}"
)
else:
self._mount_sys(
f"Welcome to the Common Room. "
f"Welcome to {display}. "
f"Use @agent_name to chat. Available: {names}"
)
else:
@@ -1322,6 +1518,9 @@ class AgentTUI(App[None]):
return
for path in hdir.glob("*.json"):
room = path.stem
# Migrate old __common__ history to new __room__common key
if room == "__common__":
room = self._room_key("common")
try:
data = json.loads(path.read_text(encoding="utf-8"))
self._chat_histories[room] = [
@@ -1354,7 +1553,7 @@ class AgentTUI(App[None]):
def _get_focused_agent_name(self) -> str | None:
"""Return the agent name for the current room (DM only)."""
if self._current_room == _COMMON_ROOM:
if self._is_room(self._current_room):
return self._last_active_agent
if self._current_room in self._agent_names:
return self._current_room
@@ -1363,12 +1562,18 @@ class AgentTUI(App[None]):
def on_tabbed_content_tab_activated(self, event: TabbedContent.TabActivated) -> None:
pass
# ── Sidebar: Provenance button ──
# ── Sidebar buttons ──
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-memory":
self._launch_memory_browser()
return
if event.button.id == "btn-new-room":
self.push_screen(
CreateRoomScreen(self._agent_names),
callback=self._on_room_created,
)
return
if event.button.id != "btn-provenance":
return
agent_name = self._get_focused_agent_name()
@@ -1407,6 +1612,58 @@ class AgentTUI(App[None]):
lines.append(line)
self._mount_sys("\n".join(lines))
# ── Room creation ──
def _on_room_created(self, result: dict[str, Any] | None) -> None:
if result is None:
return
name = result["name"]
key = self._room_key(name)
if key in self._rooms:
self._mount_sys(f"Room '{name}' already exists.")
return
self._rooms[key] = {
"name": name,
"agents": result["agents"],
"engagement": result["engagement"],
}
# Add to sidebar
room_list = self.query_one("#room-list", OptionList)
display = name.replace("-", " ").title()
engagement = result["engagement"]
n_agents = len(result["agents"])
room_list.add_option(f"{display} [{_DIM}]{engagement} · {n_agents}[/]")
# Save to config.json
self._save_room_to_config(name, result["agents"], result["engagement"])
# Switch to the new room
room_keys = list(self._rooms.keys())
idx = room_keys.index(key)
room_list.highlighted = idx
self._current_room = key
self._update_subtitle()
self._update_placeholder()
self._render_chat()
self._mount_sys(f"Room '{display}' created with {n_agents} agent(s).")
def _save_room_to_config(self, name: str, agents: list[str], engagement: str) -> None:
try:
if self._config_path.exists():
raw = self._config_path.read_text(encoding="utf-8")
config = json.loads(_strip_jsonc(raw))
else:
config = {}
rooms = config.setdefault("rooms", {})
rooms[name] = {"agents": agents, "engagement": engagement}
self._config_path.write_text(
json.dumps(config, indent=2) + "\n", encoding="utf-8"
)
except Exception:
pass
# ── Actions ──
def action_quit(self) -> None:

View File

@@ -174,10 +174,133 @@ def _parse_definition(source: Any) -> dict[str, Any]:
return parse_agent_definition(source)
def _load_agent(source: Any) -> Any:
def _load_agent(source: Any, agents_dir: Path | None = None) -> Any:
"""Load a NewAgent from a definition — delegates to crewai's loader."""
from crewai.new_agent.definition_parser import load_agent_from_definition
return load_agent_from_definition(source)
return load_agent_from_definition(source, agents_dir=agents_dir)
_MAX_CASES_PARALLEL = 4
_CASE_TIMEOUT_SECONDS = 90
async def _run_model_benchmark(
defn: dict[str, Any],
model: str,
cases: list[BenchmarkCase] | LoadedCases,
judge_model: str,
emit: Callable[[dict[str, Any]], None],
agents_dir: Path | None = None,
) -> list[BenchmarkResult]:
"""Run all benchmark cases for a single model, parallelising up to _MAX_CASES_PARALLEL."""
total = len(cases)
emit({"type": "model_start", "model": model, "total_cases": total})
sem = asyncio.Semaphore(_MAX_CASES_PARALLEL)
done_count = 0
async def _run_case(i: int, case: BenchmarkCase) -> BenchmarkResult:
nonlocal done_count
async with sem:
emit({"type": "case_start", "model": model, "case_index": i, "total_cases": total, "input": case.input})
bench_defn = dict(defn)
bench_defn["settings"] = dict(defn.get("settings", {}))
if model != "default":
bench_defn["llm"] = model
bench_defn["settings"]["memory"] = False
bench_defn["settings"]["self_improving"] = False
bench_defn["settings"]["planning"] = False
bench_defn["verbose"] = False
bench_defn["max_iter"] = min(bench_defn.get("max_iter", 25), 5)
bench_defn["max_execution_time"] = min(bench_defn.get("max_execution_time", 60), 60)
bench_defn.pop("coworkers", None)
bench_defn.pop("tools", None)
try:
agent = _load_agent(bench_defn, agents_dir=agents_dir)
except Exception as e:
done_count += 1
emit({"type": "case_done", "model": model, "case_index": done_count, "total_cases": total, "passed": False, "score": 0.0, "time_ms": 0, "error": str(e)})
return BenchmarkResult(
case_index=i, input=case.input, expected=case.expected,
actual=f"[Agent creation error: {e}]", model=model, passed=False, score=0.0,
)
start_ms = _current_time_ms()
try:
response = await asyncio.wait_for(
agent.amessage(case.input),
timeout=_CASE_TIMEOUT_SECONDS,
)
elapsed_ms = _current_time_ms() - start_ms
actual = response.content
input_tokens = response.input_tokens or 0
output_tokens = response.output_tokens or 0
cost = response.cost
except asyncio.TimeoutError:
elapsed_ms = _current_time_ms() - start_ms
done_count += 1
emit({"type": "case_done", "model": model, "case_index": done_count, "total_cases": total, "passed": False, "score": 0.0, "time_ms": elapsed_ms, "error": "timeout"})
return BenchmarkResult(
case_index=i, input=case.input, expected=case.expected,
actual=f"[Timeout after {_CASE_TIMEOUT_SECONDS}s]", model=model, passed=False, score=0.0,
response_time_ms=elapsed_ms,
)
except Exception as e:
elapsed_ms = _current_time_ms() - start_ms
done_count += 1
emit({"type": "case_done", "model": model, "case_index": done_count, "total_cases": total, "passed": False, "score": 0.0, "time_ms": elapsed_ms, "error": str(e)})
return BenchmarkResult(
case_index=i, input=case.input, expected=case.expected,
actual=f"[Error: {e}]", model=model, passed=False, score=0.0,
response_time_ms=elapsed_ms,
)
passed, score = False, 0.0
if case.expected is not None:
passed, score = _check_expected(case.expected, actual)
if case.criteria is not None:
emit({"type": "judging", "model": model, "case_index": done_count + 1, "total_cases": total})
try:
criteria_passed, criteria_score = await asyncio.wait_for(
_judge_with_llm(case.criteria, case.input, actual, judge_model),
timeout=30,
)
except asyncio.TimeoutError:
criteria_passed, criteria_score = False, 0.0
if case.expected is not None:
passed = passed and criteria_passed
score = (score + criteria_score) / 2.0
else:
passed, score = criteria_passed, criteria_score
done_count += 1
emit({"type": "case_done", "model": model, "case_index": done_count, "total_cases": total, "passed": passed, "score": score, "time_ms": elapsed_ms})
return BenchmarkResult(
case_index=i, input=case.input, expected=case.expected, actual=actual,
model=model, passed=passed, score=score,
input_tokens=input_tokens, output_tokens=output_tokens,
response_time_ms=elapsed_ms, cost=cost,
)
model_results = await asyncio.gather(*[_run_case(i, case) for i, case in enumerate(cases)])
total_passed = sum(1 for r in model_results if r.passed)
avg_score = sum(r.score for r in model_results) / len(model_results) if model_results else 0.0
total_time = max(r.response_time_ms for r in model_results) / 1000 if model_results else 0.0
total_in = sum(r.input_tokens for r in model_results)
total_out = sum(r.output_tokens for r in model_results)
total_cost = sum(r.cost for r in model_results if r.cost is not None)
emit({
"type": "model_done", "model": model,
"passed": total_passed, "total": len(model_results),
"avg_score": avg_score, "total_time": total_time,
"input_tokens": total_in, "output_tokens": total_out,
"total_cost": total_cost if total_cost > 0 else None,
})
return model_results
async def run_benchmark(
@@ -187,7 +310,7 @@ async def run_benchmark(
judge_model: str = "openai/gpt-4o-mini",
on_progress: Callable[[dict[str, Any]], None] | None = None,
) -> dict[str, list[BenchmarkResult]]:
"""Run benchmark cases against an agent definition, optionally across multiple models.
"""Run benchmark cases against an agent definition across models in parallel.
Args:
agent_def: Agent definition dict, JSON string, or file path.
@@ -199,6 +322,12 @@ async def run_benchmark(
Returns:
Dict mapping model name to list of BenchmarkResult.
"""
agents_dir: Path | None = None
if isinstance(agent_def, (str, Path)):
p = Path(agent_def)
if p.is_file():
agents_dir = p.parent
defn = _parse_definition(agent_def)
if models is None or len(models) == 0:
@@ -208,102 +337,74 @@ async def run_benchmark(
if on_progress:
on_progress(event)
results_by_model: dict[str, list[BenchmarkResult]] = {}
tasks = [
_run_model_benchmark(defn, model, cases, judge_model, _emit, agents_dir=agents_dir)
for model in models
]
all_results = await asyncio.gather(*tasks)
for mi, model in enumerate(models):
model_results: list[BenchmarkResult] = []
_emit({"type": "model_start", "model": model, "model_index": mi, "total_models": len(models), "total_cases": len(cases)})
return dict(zip(models, all_results))
for i, case in enumerate(cases):
_emit({"type": "case_start", "model": model, "case_index": i, "total_cases": len(cases), "input": case.input})
bench_defn = dict(defn)
if model != "default":
bench_defn["llm"] = model
bench_defn.setdefault("settings", {})
bench_defn["settings"]["memory_read_only"] = True
class suppress_benchmark_output:
"""Context manager that silences console formatter and noisy logging during benchmarks."""
def __enter__(self):
import logging
self._saved_formatter = None
try:
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
listener = TraceCollectionListener._instance
if listener:
self._saved_formatter = listener.formatter
listener.formatter = None
except Exception:
pass
self._loggers = []
for name in (None, "crewai.new_agent.event_listener", "crewai.new_agent.executor", "crewai"):
lg = logging.getLogger(name)
self._loggers.append((lg, lg.level))
lg.setLevel(logging.CRITICAL)
return self
def __exit__(self, *exc):
for lg, level in self._loggers:
lg.setLevel(level)
if self._saved_formatter is not None:
try:
agent = _load_agent(bench_defn)
except Exception as e:
result = BenchmarkResult(
case_index=i,
input=case.input,
expected=case.expected,
actual=f"[Agent creation error: {e}]",
model=model,
passed=False,
score=0.0,
)
model_results.append(result)
_emit({"type": "case_done", "model": model, "case_index": i, "total_cases": len(cases), "passed": False, "score": 0.0, "time_ms": 0, "error": str(e)})
continue
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
listener = TraceCollectionListener._instance
if listener:
listener.formatter = self._saved_formatter
except Exception:
pass
start_ms = _current_time_ms()
try:
response = await agent.amessage(case.input)
elapsed_ms = _current_time_ms() - start_ms
actual = response.content
input_tokens = response.input_tokens or 0
output_tokens = response.output_tokens or 0
cost = response.cost
class artifacts_sandbox:
"""Context manager that chdirs into tests/artifacts/ for the benchmark run.
except Exception as e:
elapsed_ms = _current_time_ms() - start_ms
result = BenchmarkResult(
case_index=i,
input=case.input,
expected=case.expected,
actual=f"[Error: {e}]",
model=model,
passed=False,
score=0.0,
response_time_ms=elapsed_ms,
)
model_results.append(result)
_emit({"type": "case_done", "model": model, "case_index": i, "total_cases": len(cases), "passed": False, "score": 0.0, "time_ms": elapsed_ms, "error": str(e)})
continue
Any files created by agent tools land in the artifacts directory instead of
polluting the project root. A .gitignore is written if one doesn't exist.
"""
passed = False
score = 0.0
def __init__(self, base: str | Path = "tests/artifacts"):
self._base = Path(base)
self._prev_cwd: str | None = None
if case.expected is not None:
passed, score = _check_expected(case.expected, actual)
if case.criteria is not None:
_emit({"type": "judging", "model": model, "case_index": i, "total_cases": len(cases)})
criteria_passed, criteria_score = await _judge_with_llm(
case.criteria, case.input, actual, judge_model
)
if case.expected is not None:
passed = passed and criteria_passed
score = (score + criteria_score) / 2.0
else:
passed = criteria_passed
score = criteria_score
def __enter__(self):
import os
self._base.mkdir(parents=True, exist_ok=True)
gitignore = self._base / ".gitignore"
if not gitignore.exists():
gitignore.write_text("*\n")
self._prev_cwd = os.getcwd()
os.chdir(self._base)
return self
result = BenchmarkResult(
case_index=i,
input=case.input,
expected=case.expected,
actual=actual,
model=model,
passed=passed,
score=score,
input_tokens=input_tokens,
output_tokens=output_tokens,
response_time_ms=elapsed_ms,
cost=cost,
)
model_results.append(result)
_emit({"type": "case_done", "model": model, "case_index": i, "total_cases": len(cases), "passed": passed, "score": score, "time_ms": elapsed_ms})
results_by_model[model] = model_results
total_passed = sum(1 for r in model_results if r.passed)
avg_score = sum(r.score for r in model_results) / len(model_results) if model_results else 0.0
_emit({"type": "model_done", "model": model, "passed": total_passed, "total": len(model_results), "avg_score": avg_score})
return results_by_model
def __exit__(self, *exc):
import os
if self._prev_cwd:
os.chdir(self._prev_cwd)
def _current_time_ms() -> int:
@@ -389,7 +490,7 @@ def format_comparison_table(results_by_model: dict[str, list[BenchmarkResult]])
avg_score = sum(r.score for r in results) / n if n > 0 else 0.0
total_in = sum(r.input_tokens for r in results)
total_out = sum(r.output_tokens for r in results)
total_time = sum(r.response_time_ms for r in results)
total_time = max((r.response_time_ms for r in results), default=0)
model_trunc = model[:28] if len(model) > 28 else model
line = (
@@ -477,12 +578,8 @@ def print_results_chart(
model = results[0].model
has_cost = any(r.cost is not None for r in results)
inner_w = max(console.width - 4, 60)
bar_w = 12
overhead = 2 + 2 + 2 + 2 + bar_w + 1 + 4 + 2 + 4 + 2 + 6
if has_cost:
overhead += 2 + 7
input_w = max(15, inner_w - overhead)
bar_w = 10
input_w = 35
rows: list[str] = []
for r in results:
@@ -506,21 +603,21 @@ def print_results_chart(
color = _score_color(avg)
summary_parts = [
f"[{color}]{passed}/{n} passed[/]",
f"avg [{color}]{avg:.2f}[/]",
f"[dim]{total_time:.1f}s[/]",
f"[dim]↑{_fmt_tokens(total_in)}{_fmt_tokens(total_out)}[/]",
f"[{color}]{passed}/{n} passed[/{color}]",
f"avg [{color}]{avg:.2f}[/{color}]",
f"[dim]{total_time:.1f}s[/dim]",
f"[dim]↑{_fmt_tokens(total_in)}{_fmt_tokens(total_out)}[/dim]",
]
if total_cost > 0:
summary_parts.append(f"[dim]{_fmt_cost(total_cost)}[/]")
summary_parts.append(f"[dim]{_fmt_cost(total_cost)}[/dim]")
body = "\n".join(rows) + "\n\n " + " · ".join(summary_parts)
panel = Panel(
body,
title=f"[bold cyan]{model}[/]",
title=f"[bold cyan]{model}[/bold cyan]",
title_align="left",
border_style="cyan",
padding=(1, 0),
border_style="dim",
padding=(0, 1),
expand=False,
)
console.print(panel)
@@ -550,7 +647,7 @@ def print_comparison_chart(
n = len(results)
passed = sum(1 for r in results if r.passed)
avg = sum(r.score for r in results) / n if n else 0.0
total_time = sum(r.response_time_ms for r in results) / 1000
total_time = max((r.response_time_ms for r in results), default=0) / 1000
total_tokens = sum(r.input_tokens + r.output_tokens for r in results)
models_data.append({
"model": model, "passed": passed, "n": n,

View File

@@ -502,20 +502,21 @@ def memory(
type=float,
default=None,
help="Minimum score to pass a test case (NewAgent only, 0.0-1.0). "
"Defaults to test_threshold in config.json (0.7 if not set).",
"Defaults to test.threshold in config.json (0.7 if not set).",
)
@click.option(
"--judge-model",
type=str,
default="openai/gpt-4o-mini",
help="LLM model for evaluation judging (NewAgent only).",
default=None,
help="LLM model for evaluation judging (NewAgent only). "
"Defaults to test.judge_model in config.json (openai/gpt-4o-mini if not set).",
)
def test(
n_iterations: int,
model: str | None,
trained_agents_file: str | None,
threshold: float | None,
judge_model: str,
judge_model: str | None,
) -> None:
"""Test the crew or agents and evaluate the results.
@@ -530,26 +531,33 @@ def test(
agent_files = sorted(agents_dir.glob("*.json")) + sorted(agents_dir.glob("*.jsonc")) if agents_dir.is_dir() else []
if agent_files:
effective_judge = judge_model or _read_config("test", "judge_model") or "openai/gpt-4o-mini"
if _needs_uv_relaunch():
uv_args = ["test", "-n", str(n_iterations), "--threshold", str(threshold), "--judge-model", judge_model]
uv_args = ["test", "-n", str(n_iterations), "--judge-model", effective_judge]
if threshold is not None:
uv_args.extend(["--threshold", str(threshold)])
if model:
uv_args.extend(["-m", model])
if trained_agents_file:
uv_args.extend(["-f", trained_agents_file])
_relaunch_via_uv(uv_args)
project_threshold = _read_config_threshold()
effective_threshold = threshold or project_threshold or 0.7
config_threshold = _read_config("test", "threshold") or _read_config("test_threshold")
effective_threshold = threshold or (float(config_threshold) if config_threshold is not None else None) or 0.7
_test_new_agents(agent_files, n_iterations, model, effective_threshold, judge_model)
_test_new_agents(agent_files, n_iterations, model, effective_threshold, effective_judge)
else:
crew_model = model or "gpt-4o-mini"
click.echo(f"Testing the crew for {n_iterations} iterations with model {crew_model}")
evaluate_crew(n_iterations, crew_model, trained_agents_file=trained_agents_file)
def _read_config_threshold() -> float | None:
"""Read test_threshold from config.json if it exists."""
def _read_config(*keys: str) -> Any:
"""Read a nested value from config.json (JSONC-safe).
Example: _read_config("test", "threshold") reads config["test"]["threshold"].
"""
import json
from pathlib import Path
@@ -562,74 +570,132 @@ def _read_config_threshold() -> float | None:
clean = re.sub(r"(?<!:)//.*?$", "", raw, flags=re.MULTILINE)
clean = re.sub(r"/\*.*?\*/", "", clean, flags=re.DOTALL)
data = json.loads(clean)
val = data.get("test_threshold")
return float(val) if val is not None else None
for k in keys:
if not isinstance(data, dict):
return None
data = data.get(k)
if data is None:
return None
return data
except Exception:
return None
def _make_benchmark_progress():
"""Create a progress callback with Rich spinner animation."""
import time
class _BenchmarkLiveProgress:
"""Live parallel progress display for benchmark runs."""
from rich.console import Console
from rich.spinner import Spinner
from rich.live import Live
def __init__(self, console=None):
from rich.console import Console
self._console = console or Console()
self._state: dict[str, dict] = {}
self._live = None
console = Console()
state: dict = {"live": None}
def start(self):
from rich.live import Live
self._live = Live(
self._render(),
console=self._console,
refresh_per_second=10,
transient=False,
)
self._live.start()
def _stop_live():
if state["live"]:
state["live"].stop()
state["live"] = None
def stop(self):
if self._live:
self._live.update(self._render())
self._live.stop()
self._live = None
def progress(event: dict) -> None:
def on_progress(self, event: dict) -> None:
t = event["type"]
model = event.get("model", "")
if t == "model_start":
_stop_live()
label = event["model"]
if event["total_models"] > 1:
label = f"\\[{event['model_index'] + 1}/{event['total_models']}] {label}"
console.print(f"\n[bold cyan]▶ {label}[/] [dim]({event['total_cases']} cases)[/]")
self._state[model] = {
"done": 0, "total": event["total_cases"],
"status": "starting", "passed": 0,
"avg": 0.0, "time": 0.0,
"in_tokens": 0, "out_tokens": 0, "cost": None,
}
elif t == "case_start":
_stop_live()
idx = event["case_index"] + 1
total = event["total_cases"]
snippet = event["input"][:60] + ("" if len(event["input"]) > 60 else "")
console.print(f" [dim]\\[{idx}/{total}][/] {snippet}")
state["live"] = Live(
Spinner("dots", text=" running…", style="cyan"),
console=console,
transient=True,
)
state["live"].start()
self._state[model]["status"] = "running"
elif t == "judging":
if state["live"]:
state["live"].update(
Spinner("dots", text=" judging…", style="cyan")
)
self._state[model]["status"] = "judging"
elif t == "case_done":
_stop_live()
elapsed_s = event["time_ms"] / 1000
if event.get("error"):
console.print(f" [red]✗ ERROR[/red] ({elapsed_s:.1f}s)")
elif event["passed"]:
console.print(f" [green]✓ PASS[/green] score={event['score']:.2f} ({elapsed_s:.1f}s)")
else:
console.print(f" [red]✗ FAIL[/red] score={event['score']:.2f} ({elapsed_s:.1f}s)")
s = self._state[model]
s["done"] = max(s["done"], event["case_index"])
if event.get("passed"):
s["passed"] += 1
s["status"] = "running"
elif t == "model_done":
_stop_live()
p, tot, avg = event["passed"], event["total"], event["avg_score"]
color = "green" if p == tot else ("yellow" if p > 0 else "red")
console.print(f" [{color}]── {p}/{tot} passed · avg score {avg:.2f}[/{color}]")
s = self._state[model]
s["status"] = "done"
s["passed"] = event.get("passed", s["passed"])
s["done"] = event.get("total", s["done"])
s["avg"] = event["avg_score"]
s["time"] = event.get("total_time", 0.0)
s["in_tokens"] = event.get("input_tokens", 0)
s["out_tokens"] = event.get("output_tokens", 0)
s["cost"] = event.get("total_cost")
return progress
if self._live:
self._live.update(self._render())
def _render(self):
from rich.table import Table
from rich.spinner import Spinner
from rich.text import Text
from rich import box
from crewai_cli.benchmark import _score_color, _fmt_tokens, _fmt_cost
has_cost = any(
info.get("cost") is not None
for info in self._state.values()
if info["status"] == "done"
)
n_cols = 7 if has_cost else 6
table = Table(box=box.SIMPLE, show_header=False, padding=(0, 1), expand=False)
table.add_column("", width=1) # icon
table.add_column("", no_wrap=True) # model
table.add_column("", no_wrap=True, justify="right") # passed or bar
table.add_column("", no_wrap=True, justify="right") # score
table.add_column("", no_wrap=True, justify="right") # time
table.add_column("", no_wrap=True, justify="right") # tokens
if has_cost:
table.add_column("", no_wrap=True, justify="right") # cost
for model, info in self._state.items():
if info["status"] == "done":
icon = Text("", style="green")
color = _score_color(info["avg"])
cols = [
icon,
model,
Text.from_markup(f"[{color}]{info['passed']}/{info['total']}[/{color}]"),
Text.from_markup(f"[{color}]{info['avg']:.2f}[/{color}]"),
Text(f"{info['time']:.1f}s", style="dim"),
Text(f"{_fmt_tokens(info['in_tokens'])}{_fmt_tokens(info['out_tokens'])}", style="dim"),
]
if has_cost:
if info["cost"] is not None:
cols.append(Text(_fmt_cost(info["cost"]), style="dim"))
else:
cols.append(Text(""))
else:
bar_w = 10
pct = info["done"] / info["total"] if info["total"] > 0 else 0
filled = round(pct * bar_w)
icon = Spinner("dots", style="cyan")
progress = Text.from_markup(
f"[cyan]{'' * filled}{'' * (bar_w - filled)}[/cyan] {info['done']}/{info['total']}"
)
cols = [icon, model, progress] + [Text("")] * (n_cols - 3)
table.add_row(*cols)
return table
def _test_new_agents(
@@ -639,7 +705,7 @@ def _test_new_agents(
threshold: float,
judge_model: str,
) -> None:
"""Run NewAgent test cases with pass/fail threshold."""
"""Run NewAgent test cases with pass/fail threshold (all agents in parallel)."""
import asyncio
from pathlib import Path
@@ -647,7 +713,6 @@ def _test_new_agents(
from crewai_cli.benchmark import (
load_benchmark_cases,
print_results_chart,
run_benchmark,
)
@@ -655,9 +720,9 @@ def _test_new_agents(
tests_dir = Path("tests")
if not tests_dir.is_dir() and Path("benchmarks").is_dir():
tests_dir = Path("benchmarks")
all_passed = True
agents_tested = 0
# Collect valid agents + cases
jobs: list[dict] = []
for agent_path in agent_files:
agent_name = agent_path.stem
cases_path = tests_dir / f"{agent_name}_cases.json"
@@ -670,52 +735,91 @@ def _test_new_agents(
loaded = load_benchmark_cases(cases_path)
except (FileNotFoundError, ValueError) as e:
click.secho(f" Error loading cases for {agent_name}: {e}", fg="red")
all_passed = False
continue
file_threshold = loaded.threshold if loaded.threshold is not None else threshold
jobs.append({
"agent_name": agent_name,
"agent_path": str(agent_path.resolve()),
"cases": loaded.cases,
"threshold": file_threshold,
})
model_list = [model] if model else None
if not jobs:
click.secho("No agents with matching benchmark cases found.", fg="yellow")
raise SystemExit(1)
click.echo()
click.secho(f"Testing {agent_name} ({len(loaded)} cases, threshold={file_threshold})", fg="cyan", bold=True)
model_list = [model] if model else None
try:
results_by_model = asyncio.run(
# Progress display — prefix model key with agent name
progress = _BenchmarkLiveProgress(console=_con)
def _make_progress_cb(agent_name: str):
def _cb(event: dict) -> None:
prefixed = dict(event)
if "model" in prefixed:
prefixed["model"] = f"{agent_name}/{prefixed['model']}"
progress.on_progress(prefixed)
return _cb
async def _run_all():
tasks = []
for job in jobs:
tasks.append(
run_benchmark(
agent_def=str(agent_path),
cases=loaded.cases,
agent_def=job["agent_path"],
cases=job["cases"],
models=model_list,
judge_model=judge_model,
on_progress=_make_benchmark_progress(),
on_progress=_make_progress_cb(job["agent_name"]),
)
)
except Exception as e:
click.secho(f" Error running tests for {agent_name}: {e}", fg="red")
return await asyncio.gather(*tasks, return_exceptions=True)
agent_count = sum(1 for j in jobs for _ in (model_list or [None]))
case_count = sum(len(j["cases"]) for j in jobs)
click.echo()
click.secho(
f"Testing {len(jobs)} agent(s), {case_count} cases (threshold={threshold})",
fg="cyan", bold=True,
)
from crewai_cli.benchmark import artifacts_sandbox, suppress_benchmark_output
progress.start()
try:
with artifacts_sandbox(), suppress_benchmark_output():
all_results = asyncio.run(_run_all())
finally:
progress.stop()
# Evaluate results
all_passed = True
agents_tested = 0
for job, result in zip(jobs, all_results):
if isinstance(result, Exception):
click.secho(f" Error running tests for {job['agent_name']}: {result}", fg="red")
all_passed = False
continue
agents_tested += 1
for model_name, results in results_by_model.items():
_con.print()
print_results_chart(results, console=_con)
failed = [r for r in results if r.score < file_threshold]
for model_name, results in result.items():
failed = [r for r in results if r.score < job["threshold"]]
if failed:
all_passed = False
_con.print(
f"\n [red bold]FAILED: {len(failed)}/{len(results)} "
f"cases below threshold ({file_threshold})[/red bold]"
f" [red bold]{job['agent_name']}: FAILED {len(failed)}/{len(results)} "
f"cases below threshold ({job['threshold']})[/red bold]"
)
for r in failed:
inp = r.input[:60] + ("" if len(r.input) > 60 else "")
_con.print(f" [red]#{r.case_index + 1}[/red] [dim]{inp}[/dim] [red]{r.score:.2f}[/red]")
else:
_con.print(
f"\n [green bold]PASSED: all {len(results)} cases >= {file_threshold}[/green bold]"
f" [green bold]{job['agent_name']}: PASSED all {len(results)} cases >= {job['threshold']}[/green bold]"
)
click.echo()
if agents_tested == 0:
click.secho("No agents with matching benchmark cases found.", fg="yellow")
click.secho("No agents completed successfully.", fg="yellow")
raise SystemExit(1)
elif all_passed:
click.secho(f"All tests passed ({agents_tested} agent(s)).", fg="green", bold=True)
@@ -1456,20 +1560,23 @@ def checkpoint_prune(
)
@click.option(
"--judge-model",
default="openai/gpt-4o-mini",
help="Model for LLM judge evaluation",
default=None,
help="Model for LLM judge evaluation. "
"Defaults to test.judge_model in config.json (openai/gpt-4o-mini if not set).",
)
def benchmark(
agent_path: str,
cases_path: str,
models: tuple[str, ...],
judge_model: str,
judge_model: str | None,
) -> None:
"""Run agent against test cases and report results."""
import asyncio
from crewai_cli.run_crew import _needs_uv_relaunch, _relaunch_via_uv
judge_model = judge_model or _read_config("test", "judge_model") or "openai/gpt-4o-mini"
if _needs_uv_relaunch():
uv_args = ["benchmark", agent_path, cases_path, "--judge-model", judge_model]
for m in models:
@@ -1481,12 +1588,15 @@ def benchmark(
from crewai_cli.benchmark import (
load_benchmark_cases,
print_comparison_chart,
print_results_chart,
run_benchmark,
)
_con = _RichConsole()
from pathlib import Path as _P
agent_path = str(_P(agent_path).resolve())
cases_path = str(_P(cases_path).resolve())
try:
cases = load_benchmark_cases(cases_path)
except (FileNotFoundError, ValueError) as e:
@@ -1502,23 +1612,26 @@ def benchmark(
click.echo(f"Judge model: {judge_model}")
click.echo()
from crewai_cli.benchmark import artifacts_sandbox, suppress_benchmark_output
progress = _BenchmarkLiveProgress(console=_con)
progress.start()
try:
results_by_model = asyncio.run(
run_benchmark(
agent_def=agent_path,
cases=cases,
models=model_list,
judge_model=judge_model,
on_progress=_make_benchmark_progress(),
with artifacts_sandbox(), suppress_benchmark_output():
results_by_model = asyncio.run(
run_benchmark(
agent_def=agent_path,
cases=cases,
models=model_list,
judge_model=judge_model,
on_progress=progress.on_progress,
)
)
)
except Exception as e:
click.secho(f"Error running benchmark: {e}", fg="red")
raise SystemExit(1) from e
for model, results in results_by_model.items():
_con.print()
print_results_chart(results, console=_con)
finally:
progress.stop()
if len(results_by_model) > 1:
_con.print()

View File

@@ -122,9 +122,15 @@ PROJECT_CONFIG_TEMPLATE = """\
{
// Project configuration for crewai agents
// Minimum score (0.01.0) for a test case to pass.
// Override per test file with: {"threshold": 0.9, "cases": [...]}
"test_threshold": 0.7,
// Test / benchmark settings
"test": {
// Minimum score (0.01.0) for a test case to pass.
// Override per test file with: {"threshold": 0.9, "cases": [...]}
"threshold": 0.7,
// LLM used to judge test responses (provider/model format)
"judge_model": "openai/gpt-4o-mini"
},
// Rooms define how agents collaborate in the TUI
"rooms": {
@@ -133,10 +139,10 @@ PROJECT_CONFIG_TEMPLATE = """\
"agents": [],
// Engagement mode:
// "dm"chat with one agent at a time (default)
// "organic"all agents see messages, respond if relevant (default)
// "dm" — chat with one agent at a time
// "tagged" — @mention to direct messages
// "organic" — all agents see messages, respond if relevant
"engagement": "dm"
"engagement": "organic"
}
}
}
@@ -182,6 +188,7 @@ _GITIGNORE_TEMPLATE = """\
__pycache__/
.DS_Store
.crewai/
tests/artifacts/
"""
@@ -807,7 +814,7 @@ def _add_agent_to_config(base: Path, agent_name: str) -> None:
config = json.loads(clean)
rooms = config.get("rooms", {})
common = rooms.get("common", {"agents": [], "engagement": "dm"})
common = rooms.get("common", {"agents": [], "engagement": "organic"})
agents = common.get("agents", [])
if agent_name not in agents:
agents.append(agent_name)

View File

@@ -98,8 +98,8 @@ def load_agent_from_definition(
agent_name = defn.get("name", "")
if agent_name and agent_name in _loading_chain:
logger.warning(
"Circular coworker reference for '%s' — skipping to prevent infinite recursion",
logger.debug(
"Skipping coworker back-reference '%s' (already in loading chain)",
agent_name,
)
return None
@@ -249,8 +249,8 @@ def _resolve_coworkers(
elif "ref" in cw:
ref_name = cw["ref"]
if _loading_chain and ref_name in _loading_chain:
logger.warning(
"Circular coworker ref '%s' — skipping to prevent infinite recursion",
logger.debug(
"Skipping coworker back-reference '%s' (already in loading chain)",
ref_name,
)
continue
@@ -258,7 +258,9 @@ def _resolve_coworkers(
for ext in (".json", ".jsonc"):
ref_path = agents_dir / f"{ref_name}{ext}"
if ref_path.exists():
result = load_agent_from_definition(ref_path, agents_dir, _loading_chain)
result = load_agent_from_definition(
ref_path, agents_dir, set(_loading_chain) if _loading_chain else None
)
if result is not None:
coworkers.append(result)
break

View File

@@ -483,12 +483,22 @@ class ConversationalAgentExecutor(BaseModel):
def _estimate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float | None:
"""Approximate cost in USD based on common model pricing per 1M tokens."""
costs = {
"gpt-4o": (2.50, 10.00),
"gpt-4.1-nano": (0.10, 0.40),
"gpt-4.1-mini": (0.40, 1.60),
"gpt-4.1": (2.00, 8.00),
"gpt-4o-mini": (0.15, 0.60),
"gpt-4o": (2.50, 10.00),
"gpt-4": (30.00, 60.00),
"gpt-5-mini": (0.25, 2.00),
"gpt-5.5": (5.00, 30.00),
"gpt-5.4": (2.50, 15.00),
"gpt-5": (1.25, 10.00),
"o4-mini": (1.10, 4.40),
"o3-mini": (1.10, 4.40),
"o3": (2.00, 8.00),
"claude-opus": (5.00, 25.00),
"claude-sonnet": (3.00, 15.00),
"claude-haiku": (0.25, 1.25),
"claude-opus": (15.00, 75.00),
"claude-haiku": (1.00, 5.00),
}
for key, (inp_cost, out_cost) in costs.items():
if key in model.lower():
@@ -719,9 +729,9 @@ class ConversationalAgentExecutor(BaseModel):
ctx_size = llm.get_context_window_size()
total_chars = sum(len(str(m.get("content", ""))) for m in llm_messages)
est_tokens = total_chars // 4
est_tokens = total_chars // 3
if est_tokens < int(ctx_size * 0.75):
if est_tokens < int(ctx_size * 0.60):
return
try:
@@ -1486,6 +1496,8 @@ class ConversationalAgentExecutor(BaseModel):
response_text = tool_result
break
self._proactive_summarize_messages(llm_messages, callbacks)
# GAP-21: Call step_callback at each iteration boundary
if self.agent.step_callback:
self.agent.step_callback(iterations, self._tools_used_this_turn, response_text)

View File

@@ -9,6 +9,7 @@ CONTEXT_LIMIT_ERRORS: Final[list[str]] = [
"context window full",
"too many tokens",
"input is too long",
"prompt is too long",
"exceeds token limit",
]