feat: enhance agent TUI and CLI with streaming responses and model selection improvements

- Added a `_safe_render` function to escape Rich markup and convert markdown to Rich format.
- Implemented token-by-token streaming for agent responses in the TUI, improving user experience during interactions.
- Updated the CLI to allow selection of LLM providers and models, enhancing flexibility in agent creation.
- Refactored benchmark case paths to use a `tests` directory instead of `benchmarks`.
- Introduced a `last_stream_result` property in the `NewAgent` class to retrieve the latest streaming response.

These changes aim to provide a more interactive and user-friendly experience in managing agents within the CrewAI framework.
This commit is contained in:
Joao Moura
2026-05-12 16:03:50 -04:00
committed by alex-clawd
parent fe7f730546
commit 6cb29dce65
7 changed files with 235 additions and 87 deletions

View File

@@ -12,6 +12,9 @@ import json
import os
import re
import sys
import time
from rich.markup import escape as _rich_escape
from pathlib import Path
from typing import Any
@@ -67,6 +70,11 @@ _COMMON_ROOM = "__common__"
_SPINNER = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
def _safe_render(text: str) -> str:
"""Escape Rich markup in text so square brackets are displayed literally."""
return _rich_escape(text)
def _strip_jsonc(text: str) -> str:
text = re.sub(r"(?<!:)//.*?$", "", text, flags=re.MULTILINE)
text = re.sub(r"/\*.*?\*/", "", text, flags=re.DOTALL)
@@ -135,21 +143,32 @@ class ThinkingIndicator(Static):
self._tokens = ""
self._prev_input: int = 0
self._prev_output: int = 0
self._step_start: float = time.monotonic()
def update_status(self, state: str, detail: str | None, input_tokens: int, output_tokens: int) -> None:
label = detail or state or "working…"
# Mark the previous step as done (skip the initial placeholder,
# but keep its creation timestamp so the first real step inherits it)
if self._current_status and self._current_status != "starting…":
step_in = input_tokens - self._prev_input
step_out = output_tokens - self._prev_output
tok = f" [{_DIM}]↑{step_in:,}{step_out:,}[/]" if (step_in or step_out) else ""
done_line = f" [{_DIM}]✓ {self._current_status}{tok}[/]"
if done_line not in self._steps:
step_elapsed = time.monotonic() - self._step_start
meta_parts: list[str] = []
if step_in or step_out:
meta_parts.append(f"{step_in:,}{step_out:,}")
if step_elapsed >= 0.1:
meta_parts.append(f"{step_elapsed:.1f}s")
meta = " · ".join(meta_parts)
suffix = f" ({meta})" if meta else ""
done_line = f" [{_DIM}]✓ {self._current_status}{suffix}[/]"
if not any(self._current_status in s for s in self._steps):
self._steps.append(done_line)
if len(self._steps) > 6:
self._steps = self._steps[-6:]
self._current_status = label
self._prev_input = input_tokens
self._prev_output = output_tokens
self._step_start = time.monotonic()
if input_tokens or output_tokens:
self._tokens = f"[{_DIM}]↑{input_tokens:,}{output_tokens:,}[/]"
self._render_frame()
@@ -1104,22 +1123,66 @@ class AgentTUI(App[None]):
)
self._last_active_agent = target
response = await asyncio.to_thread(agent.message, message_text)
# Stream response token-by-token
scroll = self.query_one("#chat-scroll", VerticalScroll)
follow_tail = self._is_near_bottom(scroll)
bubble: ChatBubble | None = None
accumulated = ""
stream_start = time.monotonic()
stream_chars = 0
def _stream_markup(text: str, final: bool = False, metadata: str = "") -> str:
rendered = _safe_render(text)
mk = f"[bold {_CORAL}]{target}[/]\n{rendered}"
if final:
if metadata:
mk += f"\n\n[{_DIM}]{metadata}[/]"
else:
cursor = f"[{_CORAL}]▎[/]"
elapsed = time.monotonic() - stream_start
est_tokens = stream_chars // 4
progress = f"[{_DIM}]~{est_tokens:,} tokens · {elapsed:.1f}s[/]"
mk += f"{cursor}\n\n{progress}"
return mk
async for chunk in agent.stream(message_text):
accumulated += chunk
stream_chars += len(chunk)
if bubble is None and self._current_room == room:
bubble = ChatBubble(
_stream_markup(accumulated), classes="agent-bubble"
)
# Insert bubble before thinking so indicator stays at bottom
scroll.mount(bubble, before=thinking)
if follow_tail:
scroll.scroll_end(animate=False)
elif bubble is not None:
bubble.update(_stream_markup(accumulated))
if follow_tail:
scroll.scroll_end(animate=False)
# Remove cursor, add final metadata
await self._safe_remove(thinking)
response = getattr(agent, "last_stream_result", None)
meta_parts: list[str] = []
if response.input_tokens or response.output_tokens:
meta_parts.append(
f"{response.input_tokens or 0:,} "
f" {response.output_tokens or 0:,} tokens"
)
if response.response_time_ms:
meta_parts.append(f"{response.response_time_ms / 1000:.1f}s")
if response:
if getattr(response, "input_tokens", 0) or getattr(response, "output_tokens", 0):
meta_parts.append(
f" {response.input_tokens or 0:,} "
f"{response.output_tokens or 0:,} tokens"
)
if getattr(response, "response_time_ms", 0):
meta_parts.append(f"{response.response_time_ms / 1000:.1f}s")
metadata = " · ".join(meta_parts)
await self._safe_remove(thinking)
self._append_msg(room, target, response.content, metadata)
if self._current_room == room:
self._mount_bubble(target, response.content, metadata)
if bubble is not None:
bubble.update(_stream_markup(accumulated, final=True, metadata=metadata))
content = accumulated or (response.content if response else "")
self._append_msg(room, target, content, metadata)
except Exception as e:
await self._safe_remove(thinking)
@@ -1190,14 +1253,12 @@ class AgentTUI(App[None]):
self, sender: str, content: str, metadata: str = ""
) -> ChatBubble:
if sender == "You":
rendered = re.sub(r'\*\*(.+?)\*\*', r'[bold]\1[/bold]', content)
markup = f"[bold #e8e8e8]You[/]\n{rendered}"
markup = f"[bold #e8e8e8]You[/]\n{_safe_render(content)}"
return ChatBubble(markup, classes="user-bubble")
if sender == "system":
markup = f"[dim italic]{content}[/]"
markup = f"[dim italic]{_rich_escape(content)}[/]"
return ChatBubble(markup, classes="system-bubble")
rendered = re.sub(r'\*\*(.+?)\*\*', r'[bold]\1[/bold]', content)
markup = f"[bold {_CORAL}]{sender}[/]\n{rendered}"
markup = f"[bold {_CORAL}]{sender}[/]\n{_safe_render(content)}"
if metadata:
markup += f"\n\n[{_DIM}]{metadata}[/]"
return ChatBubble(markup, classes="agent-bubble")

View File

@@ -192,12 +192,12 @@ def _train_new_agents(agent_files: list, n_iterations: int) -> None:
from crewai_cli.benchmark import load_benchmark_cases
benchmarks_dir = Path("benchmarks")
tests_dir = Path("tests")
agents_trained = 0
for agent_path in agent_files:
agent_name = agent_path.stem
cases_path = benchmarks_dir / f"{agent_name}_cases.json"
cases_path = tests_dir / f"{agent_name}_cases.json"
if not cases_path.exists():
click.secho(f" Skipping {agent_name} — no {cases_path}", fg="yellow")

View File

@@ -225,8 +225,8 @@ def _bootstrap_project(base: Path, llm_model: str = "") -> None:
tools_dir = base / "tools"
tools_dir.mkdir(parents=True, exist_ok=True)
benchmarks_dir = base / "benchmarks"
benchmarks_dir.mkdir(parents=True, exist_ok=True)
tests_dir = base / "tests"
tests_dir.mkdir(parents=True, exist_ok=True)
config_path = base / "config.json"
if not config_path.exists():
@@ -319,24 +319,59 @@ def _run_uv_sync(base: Path) -> None:
def _create_benchmark_cases(base: Path, agent_name: str) -> None:
"""Create a starter benchmark cases file for the agent."""
cases_path = base / "benchmarks" / f"{agent_name}_cases.json"
cases_path = base / "tests" / f"{agent_name}_cases.json"
if cases_path.exists():
return
cases_path.parent.mkdir(parents=True, exist_ok=True)
cases_path.write_text(_STARTER_CASES, encoding="utf-8")
_POPULAR_MODELS: list[tuple[str, str]] = [
("openai/gpt-4o", "OpenAI GPT-4o"),
("openai/gpt-4o-mini", "OpenAI GPT-4o Mini (cheaper)"),
("openai/o3", "OpenAI o3 (reasoning)"),
("anthropic/claude-sonnet-4-6", "Anthropic Claude Sonnet 4.6"),
("anthropic/claude-haiku-4-5-20251001", "Anthropic Claude Haiku 4.5 (fast)"),
("gemini/gemini-2.5-pro-exp-03-25", "Google Gemini 2.5 Pro"),
("groq/llama-3.1-70b-versatile", "Groq Llama 3.1 70B (fast)"),
("ollama/llama3.1", "Ollama Llama 3.1 (local)"),
_PROVIDERS: list[tuple[str, str]] = [
("openai", "OpenAI"),
("anthropic", "Anthropic"),
("gemini", "Google Gemini"),
("groq", "Groq (fast inference)"),
("ollama", "Ollama (local)"),
]
_PROVIDER_MODELS: dict[str, list[tuple[str, str]]] = {
"openai": [
("gpt-5.5", "GPT-5.5"),
("gpt-5.5-pro", "GPT-5.5 Pro"),
("o4-mini", "o4-mini (reasoning, fast)"),
("o3", "o3 (reasoning)"),
("gpt-4.1-mini", "GPT-4.1 Mini (budget)"),
],
"anthropic": [
("claude-opus-4-6", "Claude Opus 4.6"),
("claude-sonnet-4-6", "Claude Sonnet 4.6"),
("claude-haiku-4-5-20251001", "Claude Haiku 4.5 (fast)"),
("claude-3-7-sonnet-20250219", "Claude 3.7 Sonnet"),
("claude-3-5-sonnet-20241022", "Claude 3.5 Sonnet"),
],
"gemini": [
("gemini-3-pro-preview", "Gemini 3 Pro (preview)"),
("gemini-2.5-pro-exp-03-25", "Gemini 2.5 Pro"),
("gemini-2.5-flash-preview-04-17", "Gemini 2.5 Flash"),
("gemini-2.0-flash-001", "Gemini 2.0 Flash"),
("gemini-1.5-pro", "Gemini 1.5 Pro"),
],
"groq": [
("llama-3.3-70b-versatile", "Llama 3.3 70B"),
("llama-3.1-70b-versatile", "Llama 3.1 70B"),
("llama-3.1-8b-instant", "Llama 3.1 8B (fast)"),
("deepseek-r1-distill-llama-70b", "DeepSeek R1 70B"),
("mixtral-8x7b-32768", "Mixtral 8x7B"),
],
"ollama": [
("llama3.3", "Llama 3.3"),
("llama3.1", "Llama 3.1"),
("deepseek-r1", "DeepSeek R1"),
("qwen2.5", "Qwen 2.5"),
("mistral", "Mistral"),
],
}
_POPULAR_TOOLS: list[tuple[str, str]] = [
("SerperDevTool", "Web search via Serper API"),
@@ -470,7 +505,7 @@ def create_agent(name: str | None = None) -> None:
base = Path.cwd()
# Directories are bootstrapped now, pyproject written after model selection
for d in ("agents", "tools", "benchmarks"):
for d in ("agents", "tools", "tests"):
(base / d).mkdir(parents=True, exist_ok=True)
dest = base / "agents" / f"{name}.jsonc"
@@ -517,51 +552,77 @@ def create_agent(name: str | None = None) -> None:
def _select_model() -> str:
"""Let the user pick an LLM model from popular options or type a custom one."""
labels = [f"{label} ({model_id})" for model_id, label in _POPULAR_MODELS]
labels.append("Other (enter manually)")
"""Two-step selection: provider first, then model."""
# Step 1: Pick provider
provider_labels = [label for _, label in _PROVIDERS]
provider_labels.append("Other (enter manually)")
click.echo()
click.secho(" LLM Model:", fg="cyan")
click.secho(" LLM Provider:", fg="cyan")
p_idx = _arrow_or_fallback(provider_labels)
if _is_interactive():
try:
_draw_single(labels, 0)
cursor = 0
total = len(labels)
while True:
key = _read_key()
if key == "up" and cursor > 0:
cursor -= 1
_draw_single(labels, cursor, clear=True)
elif key == "down" and cursor < total - 1:
cursor += 1
_draw_single(labels, cursor, clear=True)
elif key == "enter":
_clear_lines(total)
idx = cursor
break
except Exception:
idx = _select_model_fallback(labels)
else:
idx = _select_model_fallback(labels)
if idx == len(_POPULAR_MODELS):
if p_idx == len(_PROVIDERS):
custom = click.prompt(" Enter model (provider/model)", type=str)
return custom.strip()
selected = _POPULAR_MODELS[idx][0]
click.secho(f"{selected}", fg="green")
return selected
provider_key, provider_name = _PROVIDERS[p_idx]
click.secho(f"{provider_name}", fg="green")
# Step 2: Pick model from that provider
models = _PROVIDER_MODELS.get(provider_key, [])
model_labels = [f"{label} ({model_id})" for model_id, label in models]
model_labels.append("Other (enter model name)")
click.echo()
click.secho(f" {provider_name} Model:", fg="cyan")
m_idx = _arrow_or_fallback(model_labels)
if m_idx == len(models):
custom = click.prompt(f" Enter model name for {provider_key}/", type=str)
result = f"{provider_key}/{custom.strip()}"
else:
model_id = models[m_idx][0]
result = f"{provider_key}/{model_id}"
click.secho(f"{result}", fg="green")
return result
def _select_model_fallback(labels: list[str]) -> int:
def _arrow_or_fallback(labels: list[str]) -> int:
"""Arrow-key select if interactive, numbered fallback otherwise."""
if _is_interactive():
try:
return _arrow_select_one(labels)
except Exception:
pass
return _numbered_select(labels)
def _arrow_select_one(labels: list[str]) -> int:
"""Arrow-key single-select. Returns selected index."""
cursor = 0
total = len(labels)
_draw_single(labels, cursor)
while True:
key = _read_key()
if key == "up" and cursor > 0:
cursor -= 1
_draw_single(labels, cursor, clear=True)
elif key == "down" and cursor < total - 1:
cursor += 1
_draw_single(labels, cursor, clear=True)
elif key == "enter":
_clear_lines(total)
return cursor
def _numbered_select(labels: list[str]) -> int:
"""Numbered fallback for non-TTY environments."""
for idx, label in enumerate(labels, 1):
click.echo(f" {idx}. {label}")
click.echo()
while True:
choice = click.prompt(" Select a model", type=str, default="1")
choice = click.prompt(" Select", type=str, default="1")
try:
num = int(choice)
if 1 <= num <= len(labels):
@@ -577,7 +638,7 @@ def _select_tools() -> list[str]:
labels.append("Add custom tool class names")
click.echo()
click.secho(" Tools (press Enter to skip):", fg="cyan")
click.secho(" Tools (space to select, enter to confirm):", fg="cyan")
if _is_interactive():
try:

View File

@@ -125,7 +125,7 @@ def _contains_file_id_reference(messages: list[dict[str, Any]]) -> bool:
class AnthropicThinkingConfig(BaseModel):
type: Literal["enabled", "disabled"]
type: Literal["enabled", "disabled", "adaptive"]
budget_tokens: int | None = None
@@ -485,7 +485,7 @@ class AnthropicCompletion(BaseLLM):
if self.thinking:
if isinstance(self.thinking, AnthropicThinkingConfig):
params["thinking"] = self.thinking.model_dump()
params["thinking"] = self.thinking.model_dump(exclude_none=True)
else:
params["thinking"] = self.thinking

View File

@@ -104,6 +104,7 @@ class ConversationalAgentExecutor(BaseModel):
_last_checkpoint: dict[str, Any] = PrivateAttr(default_factory=dict)
# GAP-67: Artifacts collected during tool execution
_turn_artifacts: list[Artifact] = PrivateAttr(default_factory=list)
_last_stream_result: Any = PrivateAttr(default=None)
def model_post_init(self, __context: Any) -> None:
"""Load persisted conversation history and provenance from provider on startup."""
@@ -1369,9 +1370,28 @@ class ConversationalAgentExecutor(BaseModel):
_thinking_text = "" # GAP-53: thinking output from LLM
llm_model = getattr(llm, "model", "") or ""
# GAP-27: Enable reasoning/thinking on the LLM if supported
if self.agent.settings.reasoning_enabled and hasattr(llm, 'thinking'):
llm.thinking = True
# GAP-27: Enable reasoning/thinking on the LLM if supported (once per agent)
if self.agent.settings.reasoning_enabled and hasattr(llm, 'thinking') and not llm.thinking:
try:
from crewai.llms.providers.anthropic.completion import (
AnthropicCompletion,
AnthropicThinkingConfig,
)
if isinstance(llm, AnthropicCompletion):
llm.thinking = AnthropicThinkingConfig(type="adaptive")
try:
model_info = await asyncio.to_thread(
llm._get_sync_client().models.retrieve,
getattr(llm, "model", ""),
)
if model_info.max_tokens:
llm.max_tokens = model_info.max_tokens
except Exception:
pass
else:
llm.thinking = True
except ImportError:
llm.thinking = True
while True:
if has_reached_max_iterations(iterations, self.max_iter):
@@ -2074,12 +2094,7 @@ class ConversationalAgentExecutor(BaseModel):
now = time.monotonic()
if now - _last_status_time >= 0.5:
_last_status_time = now
est_output = self._turn_output_tokens or (_streamed_chars // 4)
await self._emit_status(
"streaming",
input_tokens=self._turn_input_tokens,
output_tokens=est_output,
)
await self._emit_status("streaming")
except asyncio.TimeoutError:
continue
@@ -2089,6 +2104,7 @@ class ConversationalAgentExecutor(BaseModel):
yield chunk
result = invoke_task.result()
self._last_stream_result = result
if _streamed_chars == 0 and result.content:
yield result.content

View File

@@ -642,6 +642,8 @@ class NewAgent(BaseModel):
"""Stream a response token by token.
GAP-31: Accepts optional conversation_id for concurrent conversations.
After the generator is exhausted, call ``last_stream_result`` to get
the full ``Message`` with token metadata.
"""
cid = conversation_id or self._default_conversation_id
executor = self._get_or_create_executor(cid)
@@ -653,6 +655,14 @@ class NewAgent(BaseModel):
async for chunk in executor.astream(user_msg):
yield chunk
@property
def last_stream_result(self) -> Message | None:
"""Return the Message from the most recent ``stream()`` call."""
executor = self._executors.get(self._default_conversation_id)
if executor:
return getattr(executor, "_last_stream_result", None)
return None
def reset_conversation(self, conversation_id: str | None = None) -> None:
"""Clear conversation history and start fresh.

View File

@@ -31,8 +31,8 @@ def strip_jsonc_comments(text: str) -> str:
# ── Helpers ─────────────────────────────────────────────────────
# Standard interactive input for agent creation:
# role, goal, backstory, llm (1=default), tools (none), api key (skip)
_DEFAULT_PROMPTS_INPUT = "Test Role\nTest Goal\n\n1\n\n\n"
# role, goal, backstory, provider (1=OpenAI), model (1=first), tools (none), api key (skip)
_DEFAULT_PROMPTS_INPUT = "Test Role\nTest Goal\n\n1\n1\n\n\n"
# ── crewai create agent <name> ──────────────────────────────────
@@ -68,10 +68,10 @@ class TestCreateAgentCommand:
"""Interactive prompts should populate role, goal, backstory."""
runner = CliRunner()
with runner.isolated_filesystem(temp_dir=tmp_path):
# role, goal, backstory, model (1=gpt-4o), tools (none), api key (skip)
# role, goal, backstory, provider (1=OpenAI), model (1=first), tools (none), api key (skip)
result = runner.invoke(
crewai, ["create", "agent", "analyst"],
input="Data Analyst\nAnalyze data\nExpert analyst\n1\n\n\n",
input="Data Analyst\nAnalyze data\nExpert analyst\n1\n1\n\n\n",
)
assert result.exit_code == 0, result.output
raw = Path("agents/analyst.jsonc").read_text()
@@ -81,16 +81,16 @@ class TestCreateAgentCommand:
assert data["role"] == "Data Analyst"
assert data["goal"] == "Analyze data"
assert data["backstory"] == "Expert analyst"
assert data["llm"] == "openai/gpt-4o"
assert data["llm"] == "openai/gpt-5.5"
def test_tools_selection(self, tmp_path: Path) -> None:
"""Selecting tools should populate the tools array."""
runner = CliRunner()
with runner.isolated_filesystem(temp_dir=tmp_path):
# role, goal, backstory, model (1), tools (1 2 = SerperDevTool + ScrapeWebsiteTool), api key (skip)
# role, goal, backstory, provider (1), model (1), tools (1 2 = SerperDevTool + ScrapeWebsiteTool), api key (skip)
result = runner.invoke(
crewai, ["create", "agent", "searcher"],
input="Web Searcher\nSearch things\n\n1\n1 2\n\n",
input="Web Searcher\nSearch things\n\n1\n1\n1 2\n\n",
)
assert result.exit_code == 0, result.output
raw = Path("agents/searcher.jsonc").read_text()
@@ -211,7 +211,7 @@ class TestAgentTemplate:
"""Unit tests for the AGENT_TEMPLATE constant."""
def _render(self, **kwargs) -> str:
defaults = {"name": "test", "role": "", "goal": "", "backstory": "", "llm": "openai/gpt-4o"}
defaults = {"name": "test", "role": "", "goal": "", "backstory": "", "llm": "openai/gpt-5.5"}
defaults.update(kwargs)
return AGENT_TEMPLATE.format(**defaults)