Support conversational flows in the CLI TUI (#6293)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled

* Add conversational flow TUI support

* properly support tui
This commit is contained in:
Lorenze Jay
2026-06-23 18:04:09 -07:00
committed by GitHub
parent f2a074e35b
commit 1862ff8f6c
6 changed files with 501 additions and 2 deletions

View File

@@ -17,7 +17,7 @@ from textual.binding import Binding, BindingType
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.css.query import NoMatches
from textual.screen import ModalScreen
from textual.widgets import Button, Footer, Header, Static
from textual.widgets import Button, Footer, Header, Input, Static
_SPINNER = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
@@ -382,6 +382,18 @@ Screen {
height: auto;
}
#conversation-input {
display: none;
height: 3;
border-top: hkey #333333;
background: #1c1c1c;
color: #e0e0e0;
}
#conversation-input:focus {
border-top: hkey #1F7982;
}
Header {
background: #1c1c1c;
color: #FF5A50;
@@ -483,6 +495,7 @@ FooterKey .footer-key--key {
total_tasks: int = 0,
agent_names: list[str] | None = None,
task_names: list[str] | None = None,
conversational: bool = False,
):
super().__init__()
self.title = f"CrewAI — {crew_name}"
@@ -544,6 +557,13 @@ FooterKey .footer-key--key {
self._event_handlers: list[tuple[type, Any]] = []
self._crew: Any = None
self._flow: Any = None
self._is_conversational = conversational
self._conversation_messages: list[tuple[str, str]] = []
self._conversation_turns = 0
self._conversation_turn_in_progress = False
self._conversation_previous_defer_trace_finalization: bool | None = None
self._conversation_exit_commands = {"exit", "quit"}
self._default_inputs: dict[str, Any] | None = None
self._crew_result: Any = None
self._crew_json_path: Any = None
@@ -566,6 +586,10 @@ FooterKey .footer-key--key {
yield Static(id="task-header")
with VerticalScroll(id="scroll-area"):
yield Static(id="main-content")
yield Input(
placeholder="Message the flow...",
id="conversation-input",
)
with VerticalScroll(id="log-panel"):
yield Static(id="log-content")
yield Footer()
@@ -574,7 +598,9 @@ FooterKey .footer-key--key {
self._start_time = time.time()
self._subscribe()
self._tick_timer = self.set_interval(1 / 8, self._tick)
if self._crew:
if self._is_conversational and self._flow:
self._start_conversational_session()
elif self._crew:
self._run_crew_worker()
elif self._crew_json_path:
self._load_and_run_worker()
@@ -725,6 +751,140 @@ FooterKey .footer-key--key {
self._tick_timer = self.set_interval(1 / 2, self._tick)
self._unsubscribe_if_no_running_memory_save(wait_for_queued=True)
# ── Conversational flow execution ───────────────────────
def _start_conversational_session(self) -> None:
from crewai.events.listeners.tracing.utils import (
set_suppress_tracing_messages,
set_tui_mode,
)
set_tui_mode(True)
set_suppress_tracing_messages(True)
with self._lock:
self._status = "chatting"
self._current_step = None
self._elapsed_frozen = None
self._conversation_previous_defer_trace_finalization = getattr(
self._flow, "defer_trace_finalization", False
)
self._flow.defer_trace_finalization = True
try:
input_widget = self.query_one("#conversation-input", Input)
input_widget.display = True
input_widget.focus()
except Exception: # noqa: S110
pass
def _finalize_conversational_session(self) -> None:
if not (self._is_conversational and self._flow):
return
try:
self._flow.finalize_session_traces()
except Exception: # noqa: S110
pass
previous = self._conversation_previous_defer_trace_finalization
if previous is not None:
try:
self._flow.defer_trace_finalization = previous
except Exception: # noqa: S110
pass
def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id != "conversation-input":
return
if not self._is_conversational:
return
message = event.value.strip()
event.input.value = ""
if not message:
return
if message.lower() in self._conversation_exit_commands:
self._finalize_conversational_session()
self._unsubscribe()
self.exit(self._crew_result)
return
if self._conversation_turn_in_progress:
return
with self._lock:
self._conversation_messages.append(("user", message))
self._conversation_turn_in_progress = True
self._conversation_turns += 1
self._status = "working"
self._current_step = ("yellow", "Thinking…", "")
self._is_streaming = False
self._streaming_text = ""
self._task_full_output = ""
self._current_llm_text = ""
event.input.disabled = True
self._run_conversation_turn_worker(message)
@work(thread=True, exclusive=True, group="conversation")
def _run_conversation_turn_worker(self, message: str) -> None:
from crewai.events.listeners.tracing.utils import (
set_suppress_tracing_messages,
set_tui_mode,
)
set_tui_mode(True)
set_suppress_tracing_messages(True)
try:
result = self._flow.handle_turn(message)
if hasattr(result, "get_full_text") and hasattr(result, "result"):
for _chunk in result:
pass
result = result.result
self.call_from_thread(self._on_conversation_turn_done, result)
except Exception as e:
self.call_from_thread(self._on_conversation_turn_failed, str(e))
def _on_conversation_turn_done(self, result: Any) -> None:
with self._lock:
output = self._stringify_output(result)
self._conversation_messages.append(("assistant", output))
self._crew_result = result
self._conversation_turn_in_progress = False
self._status = "chatting"
self._is_streaming = False
self._streaming_text = ""
self._current_step = None
self._enable_conversation_input()
self._tick()
self._scroll_to_result()
def _on_conversation_turn_failed(self, error: str) -> None:
with self._lock:
self._status = "failed"
self._error = error
self._conversation_turn_in_progress = False
self._is_streaming = False
self._current_step = None
self._enable_conversation_input()
self._tick()
def _enable_conversation_input(self) -> None:
try:
input_widget = self.query_one("#conversation-input", Input)
input_widget.disabled = False
input_widget.focus()
except Exception: # noqa: S110
pass
def _stringify_output(self, result: Any) -> str:
raw_result = getattr(result, "raw", result)
if raw_result is None:
return ""
if isinstance(raw_result, str):
return raw_result
try:
return _json.dumps(raw_result, default=str, ensure_ascii=False)
except TypeError:
return str(raw_result)
# ── Actions ─────────────────────────────────────────────
def action_toggle_sidebar(self) -> None:
@@ -783,6 +943,7 @@ FooterKey .footer-key--key {
self._refresh_log_panel()
async def action_quit(self) -> None:
self._finalize_conversational_session()
self._unsubscribe()
self.exit(self._crew_result)
@@ -958,6 +1119,30 @@ FooterKey .footer-key--key {
t = Text()
sidebar_width = 30
if self._is_conversational:
t.append(" CONVERSATION\n", style=f"bold {_C_PRIMARY}")
t.append("\n")
if self._conversation_turn_in_progress:
t.append(f" {self._spinner()} ", style=_C_PRIMARY)
t.append("Working\n", style=f"bold {_C_TEXT}")
elif self._status == "failed":
t.append(" ✘ Failed\n", style=_C_RED)
else:
t.append(" ● Ready\n", style=_C_GREEN)
t.append(f" Turns {self._conversation_turns}\n", style=_C_DIM)
t.append("\n")
t.append(" TOKENS\n", style=f"bold {_C_PRIMARY}")
t.append("\n")
out = self._output_tokens + self._live_out_tokens
t.append(f"{self._input_tokens:,}\n", style=_C_DIM)
t.append(f"{out:,}\n", style=_C_DIM)
t.append("\n")
t.append(" COMMANDS\n", style=f"bold {_C_PRIMARY}")
t.append("\n")
t.append(" quit / exit\n", style=_C_DIM)
widget.update(t)
return
t.append(" TASKS\n", style=f"bold {_C_PRIMARY}")
t.append("\n")
@@ -1011,6 +1196,22 @@ FooterKey .footer-key--key {
widget = self.query_one("#task-header", Static)
t = Text()
if self._is_conversational:
if self._status == "failed":
t.append("", style=f"bold {_C_RED}")
t.append("Failed", style=f"bold {_C_RED}")
if self._error:
t.append(f"\n{self._error[:120]}", style=_C_RED)
elif self._conversation_turn_in_progress:
t.append(f"{self._spinner()} ", style=_C_PRIMARY)
t.append("Flow is responding", style=f"bold {_C_PRIMARY}")
else:
t.append("", style=f"bold {_C_GREEN}")
t.append("Conversational flow ready", style=f"bold {_C_GREEN}")
t.append(" Type a message below", style=_C_DIM)
widget.update(t)
return
if self._status == "completed":
elapsed = self._elapsed_frozen or (time.time() - self._start_time)
t.append("", style=f"bold {_C_GREEN}")
@@ -1062,6 +1263,41 @@ FooterKey .footer-key--key {
t = Text()
should_scroll = False
if self._is_conversational:
if not self._conversation_messages and not self._is_streaming:
t.append(" Start the conversation below.\n", style=_C_MUTED)
for role, content in self._conversation_messages:
if role == "user":
t.append("\n You\n", style=f"bold {_C_TEAL}")
else:
t.append("\n Assistant\n", style=f"bold {_C_PRIMARY}")
rendered = _format_json_in_text(_unescape_text(content))
for line in rendered.split("\n"):
style = _C_TEXT if role == "assistant" else _C_DIM
t.append(f" {line}\n", style=style)
if self._is_streaming and self._streaming_text:
text = _unescape_text(self._filtered_streaming_text())
if text.strip():
t.append("\n Assistant\n", style=f"bold {_C_PRIMARY}")
for line in text.rstrip().split("\n")[-40:]:
t.append(f" {line}\n", style=_C_TEXT)
should_scroll = True
if self._status == "failed" and self._error:
t.append("\n Error\n", style=f"bold {_C_RED}")
t.append(f" {self._error}\n", style=_C_RED)
widget.update(t)
if should_scroll:
try:
self.query_one("#scroll-area", VerticalScroll).scroll_end(
animate=False
)
except Exception: # noqa: S110
pass
return
# Plan section
if self._plan and self._plan.get("steps"):
plan_title = self._plan.get("plan", "Plan")

View File

@@ -0,0 +1,105 @@
from __future__ import annotations
import importlib
import inspect
from pathlib import Path
import subprocess
import sys
from typing import Any
import click
def _project_script_target(script_name: str) -> str | None:
try:
from crewai_cli.utils import read_toml
pyproject = read_toml()
except Exception:
return None
target = pyproject.get("project", {}).get("scripts", {}).get(script_name)
return target if isinstance(target, str) else None
def _prepare_project_import_path() -> None:
cwd = Path.cwd()
for path in (cwd / "src", cwd):
path_str = str(path)
if path.exists() and path_str not in sys.path:
sys.path.insert(0, path_str)
def _load_conversational_flow_from_kickoff_script() -> Any | None:
target = _project_script_target("kickoff")
if not target or ":" not in target:
return None
module_name, _callable_name = target.split(":", 1)
_prepare_project_import_path()
try:
module = importlib.import_module(module_name)
from crewai.flow.flow import Flow
except Exception:
return None
for value in vars(module).values():
if (
inspect.isclass(value)
and value is not Flow
and issubclass(value, Flow)
and getattr(value, "conversational", False)
):
return value()
for value in vars(module).values():
if (
isinstance(value, Flow)
and getattr(value, "conversational", False)
and callable(getattr(value, "handle_turn", None))
):
return value
return None
def _run_conversational_flow_tui(flow: Any) -> Any:
from crewai_cli.crew_run_tui import CrewRunApp
app = CrewRunApp(
crew_name=getattr(flow, "name", None) or type(flow).__name__,
conversational=True,
)
app._flow = flow
app.run()
if app._status == "failed":
raise SystemExit(1)
return app._crew_result
def kickoff_flow() -> None:
"""
Kickoff the flow by running a command in the UV environment.
"""
flow = _load_conversational_flow_from_kickoff_script()
if flow is not None:
_run_conversational_flow_tui(flow)
return
command = ["uv", "run", "kickoff"]
try:
result = subprocess.run(command, capture_output=False, text=True, check=True) # noqa: S603
if result.stderr:
click.echo(result.stderr, err=True)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while running the flow: {e}", err=True)
click.echo(e.output, err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)

View File

@@ -604,6 +604,16 @@ def _run_flow_project(
run_declarative_flow_in_project_env(definition=definition)
return
from crewai_cli.kickoff_flow import (
_load_conversational_flow_from_kickoff_script,
_run_conversational_flow_tui,
)
flow = _load_conversational_flow_from_kickoff_script()
if flow is not None:
_run_conversational_flow_tui(flow)
return
_execute_uv_script("kickoff", entity_type="flow")

View File

@@ -126,6 +126,52 @@ def test_chain_deploy_does_not_login_for_deploy_exit(monkeypatch, capsys) -> Non
assert "Deploy failed with exit code 42" in capsys.readouterr().out
def test_conversation_turn_done_records_assistant_message() -> None:
class RawResult:
raw = "hello from the flow"
app = CrewRunApp(conversational=True)
app._conversation_turn_in_progress = True
app._enable_conversation_input = lambda: None # type: ignore[method-assign]
app._tick = lambda: None # type: ignore[method-assign]
app._scroll_to_result = lambda: None # type: ignore[method-assign]
app._on_conversation_turn_done(RawResult())
assert app._conversation_messages == [("assistant", "hello from the flow")]
assert app._conversation_turn_in_progress is False
assert app._status == "chatting"
assert isinstance(app._crew_result, RawResult)
@pytest.mark.asyncio
async def test_conversation_input_submits_turn() -> None:
class FakeFlow:
defer_trace_finalization = False
def handle_turn(self, message: str) -> str:
return f"reply: {message}"
def finalize_session_traces(self) -> None:
pass
app = CrewRunApp(crew_name="Demo", conversational=True)
app._flow = FakeFlow()
async with app.run_test() as pilot:
await pilot.click("#conversation-input")
await pilot.press("h", "i", "enter")
for _ in range(50):
await pilot.pause(0.05)
if app._conversation_messages[-1:] == [("assistant", "reply: hi")]:
break
assert app._conversation_messages == [
("user", "hi"),
("assistant", "reply: hi"),
]
def test_plan_step_status_updates_only_the_explicit_step() -> None:
app = _app_with_plan()

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
import sys
from crewai_cli import kickoff_flow
def test_loads_conversational_flow_from_kickoff_script(tmp_path, monkeypatch) -> None:
package_dir = tmp_path / "src" / "demo_chat"
package_dir.mkdir(parents=True)
(package_dir / "__init__.py").write_text("")
(package_dir / "main.py").write_text(
"\n".join(
[
"from crewai.flow import Flow",
"",
"class DemoChatFlow(Flow):",
" conversational = True",
]
)
)
(tmp_path / "pyproject.toml").write_text(
"\n".join(
[
"[project]",
'name = "demo-chat"',
"[project.scripts]",
'kickoff = "demo_chat.main:kickoff"',
]
)
)
monkeypatch.chdir(tmp_path)
sys.modules.pop("demo_chat.main", None)
sys.modules.pop("demo_chat", None)
flow = kickoff_flow._load_conversational_flow_from_kickoff_script()
assert flow is not None
assert type(flow).__name__ == "DemoChatFlow"
assert flow.conversational is True
def test_kickoff_flow_falls_back_to_uv_when_no_conversational_flow(
monkeypatch,
) -> None:
calls: list[list[str]] = []
def fake_run(command, capture_output, text, check):
calls.append(command)
class Result:
stderr = ""
return Result()
monkeypatch.setattr(
kickoff_flow, "_load_conversational_flow_from_kickoff_script", lambda: None
)
monkeypatch.setattr(kickoff_flow.subprocess, "run", fake_run)
kickoff_flow.kickoff_flow()
assert calls == [["uv", "run", "kickoff"]]

View File

@@ -645,6 +645,10 @@ def test_run_crew_runs_python_flow_project(monkeypatch, capsys):
"_execute_uv_script",
lambda script_name, **kwargs: calls.append((script_name, kwargs)),
)
monkeypatch.setattr(
"crewai_cli.kickoff_flow._load_conversational_flow_from_kickoff_script",
lambda: None,
)
run_crew_module.run_crew()
@@ -652,6 +656,41 @@ def test_run_crew_runs_python_flow_project(monkeypatch, capsys):
assert calls == [("kickoff", {"entity_type": "flow"})]
def test_run_crew_runs_conversational_flow_tui(monkeypatch, capsys):
class Flow:
pass
flow = Flow()
calls = []
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(
run_crew_module,
"read_toml",
lambda: {"tool": {"crewai": {"type": "flow"}}},
)
monkeypatch.setattr(
"crewai_cli.kickoff_flow._load_conversational_flow_from_kickoff_script",
lambda: flow,
)
monkeypatch.setattr(
"crewai_cli.kickoff_flow._run_conversational_flow_tui",
lambda loaded_flow: calls.append(loaded_flow),
)
monkeypatch.setattr(
run_crew_module,
"_execute_uv_script",
lambda *_args, **_kwargs: pytest.fail(
"conversational flows must use the TUI"
),
)
run_crew_module.run_crew()
assert capsys.readouterr().out == ""
assert calls == [flow]
def test_run_crew_rejects_filename_for_flow_project(monkeypatch):
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(