From 1862ff8f6cfced99ed170a755bba3374098970a5 Mon Sep 17 00:00:00 2001 From: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Date: Tue, 23 Jun 2026 18:04:09 -0700 Subject: [PATCH] Support conversational flows in the CLI TUI (#6293) * Add conversational flow TUI support * properly support tui --- lib/cli/src/crewai_cli/crew_run_tui.py | 240 ++++++++++++++++++++++++- lib/cli/src/crewai_cli/kickoff_flow.py | 105 +++++++++++ lib/cli/src/crewai_cli/run_crew.py | 10 ++ lib/cli/tests/test_crew_run_tui.py | 46 +++++ lib/cli/tests/test_kickoff_flow.py | 63 +++++++ lib/cli/tests/test_run_crew.py | 39 ++++ 6 files changed, 501 insertions(+), 2 deletions(-) create mode 100644 lib/cli/src/crewai_cli/kickoff_flow.py create mode 100644 lib/cli/tests/test_kickoff_flow.py diff --git a/lib/cli/src/crewai_cli/crew_run_tui.py b/lib/cli/src/crewai_cli/crew_run_tui.py index 9b3930350..81aae6c47 100644 --- a/lib/cli/src/crewai_cli/crew_run_tui.py +++ b/lib/cli/src/crewai_cli/crew_run_tui.py @@ -17,7 +17,7 @@ from textual.binding import Binding, BindingType from textual.containers import Horizontal, Vertical, VerticalScroll from textual.css.query import NoMatches from textual.screen import ModalScreen -from textual.widgets import Button, Footer, Header, Static +from textual.widgets import Button, Footer, Header, Input, Static _SPINNER = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" @@ -382,6 +382,18 @@ Screen { height: auto; } +#conversation-input { + display: none; + height: 3; + border-top: hkey #333333; + background: #1c1c1c; + color: #e0e0e0; +} + +#conversation-input:focus { + border-top: hkey #1F7982; +} + Header { background: #1c1c1c; color: #FF5A50; @@ -483,6 +495,7 @@ FooterKey .footer-key--key { total_tasks: int = 0, agent_names: list[str] | None = None, task_names: list[str] | None = None, + conversational: bool = False, ): super().__init__() self.title = f"CrewAI — {crew_name}" @@ -544,6 +557,13 @@ FooterKey .footer-key--key { self._event_handlers: list[tuple[type, Any]] = [] self._crew: Any = None + self._flow: Any = None + self._is_conversational = conversational + self._conversation_messages: list[tuple[str, str]] = [] + self._conversation_turns = 0 + self._conversation_turn_in_progress = False + self._conversation_previous_defer_trace_finalization: bool | None = None + self._conversation_exit_commands = {"exit", "quit"} self._default_inputs: dict[str, Any] | None = None self._crew_result: Any = None self._crew_json_path: Any = None @@ -566,6 +586,10 @@ FooterKey .footer-key--key { yield Static(id="task-header") with VerticalScroll(id="scroll-area"): yield Static(id="main-content") + yield Input( + placeholder="Message the flow...", + id="conversation-input", + ) with VerticalScroll(id="log-panel"): yield Static(id="log-content") yield Footer() @@ -574,7 +598,9 @@ FooterKey .footer-key--key { self._start_time = time.time() self._subscribe() self._tick_timer = self.set_interval(1 / 8, self._tick) - if self._crew: + if self._is_conversational and self._flow: + self._start_conversational_session() + elif self._crew: self._run_crew_worker() elif self._crew_json_path: self._load_and_run_worker() @@ -725,6 +751,140 @@ FooterKey .footer-key--key { self._tick_timer = self.set_interval(1 / 2, self._tick) self._unsubscribe_if_no_running_memory_save(wait_for_queued=True) + # ── Conversational flow execution ─────────────────────── + + def _start_conversational_session(self) -> None: + from crewai.events.listeners.tracing.utils import ( + set_suppress_tracing_messages, + set_tui_mode, + ) + + set_tui_mode(True) + set_suppress_tracing_messages(True) + with self._lock: + self._status = "chatting" + self._current_step = None + self._elapsed_frozen = None + self._conversation_previous_defer_trace_finalization = getattr( + self._flow, "defer_trace_finalization", False + ) + self._flow.defer_trace_finalization = True + + try: + input_widget = self.query_one("#conversation-input", Input) + input_widget.display = True + input_widget.focus() + except Exception: # noqa: S110 + pass + + def _finalize_conversational_session(self) -> None: + if not (self._is_conversational and self._flow): + return + try: + self._flow.finalize_session_traces() + except Exception: # noqa: S110 + pass + previous = self._conversation_previous_defer_trace_finalization + if previous is not None: + try: + self._flow.defer_trace_finalization = previous + except Exception: # noqa: S110 + pass + + def on_input_submitted(self, event: Input.Submitted) -> None: + if event.input.id != "conversation-input": + return + if not self._is_conversational: + return + + message = event.value.strip() + event.input.value = "" + if not message: + return + if message.lower() in self._conversation_exit_commands: + self._finalize_conversational_session() + self._unsubscribe() + self.exit(self._crew_result) + return + if self._conversation_turn_in_progress: + return + + with self._lock: + self._conversation_messages.append(("user", message)) + self._conversation_turn_in_progress = True + self._conversation_turns += 1 + self._status = "working" + self._current_step = ("yellow", "Thinking…", "") + self._is_streaming = False + self._streaming_text = "" + self._task_full_output = "" + self._current_llm_text = "" + + event.input.disabled = True + self._run_conversation_turn_worker(message) + + @work(thread=True, exclusive=True, group="conversation") + def _run_conversation_turn_worker(self, message: str) -> None: + from crewai.events.listeners.tracing.utils import ( + set_suppress_tracing_messages, + set_tui_mode, + ) + + set_tui_mode(True) + set_suppress_tracing_messages(True) + try: + result = self._flow.handle_turn(message) + if hasattr(result, "get_full_text") and hasattr(result, "result"): + for _chunk in result: + pass + result = result.result + self.call_from_thread(self._on_conversation_turn_done, result) + except Exception as e: + self.call_from_thread(self._on_conversation_turn_failed, str(e)) + + def _on_conversation_turn_done(self, result: Any) -> None: + with self._lock: + output = self._stringify_output(result) + self._conversation_messages.append(("assistant", output)) + self._crew_result = result + self._conversation_turn_in_progress = False + self._status = "chatting" + self._is_streaming = False + self._streaming_text = "" + self._current_step = None + self._enable_conversation_input() + self._tick() + self._scroll_to_result() + + def _on_conversation_turn_failed(self, error: str) -> None: + with self._lock: + self._status = "failed" + self._error = error + self._conversation_turn_in_progress = False + self._is_streaming = False + self._current_step = None + self._enable_conversation_input() + self._tick() + + def _enable_conversation_input(self) -> None: + try: + input_widget = self.query_one("#conversation-input", Input) + input_widget.disabled = False + input_widget.focus() + except Exception: # noqa: S110 + pass + + def _stringify_output(self, result: Any) -> str: + raw_result = getattr(result, "raw", result) + if raw_result is None: + return "" + if isinstance(raw_result, str): + return raw_result + try: + return _json.dumps(raw_result, default=str, ensure_ascii=False) + except TypeError: + return str(raw_result) + # ── Actions ───────────────────────────────────────────── def action_toggle_sidebar(self) -> None: @@ -783,6 +943,7 @@ FooterKey .footer-key--key { self._refresh_log_panel() async def action_quit(self) -> None: + self._finalize_conversational_session() self._unsubscribe() self.exit(self._crew_result) @@ -958,6 +1119,30 @@ FooterKey .footer-key--key { t = Text() sidebar_width = 30 + if self._is_conversational: + t.append(" CONVERSATION\n", style=f"bold {_C_PRIMARY}") + t.append("\n") + if self._conversation_turn_in_progress: + t.append(f" {self._spinner()} ", style=_C_PRIMARY) + t.append("Working\n", style=f"bold {_C_TEXT}") + elif self._status == "failed": + t.append(" ✘ Failed\n", style=_C_RED) + else: + t.append(" ● Ready\n", style=_C_GREEN) + t.append(f" Turns {self._conversation_turns}\n", style=_C_DIM) + t.append("\n") + t.append(" TOKENS\n", style=f"bold {_C_PRIMARY}") + t.append("\n") + out = self._output_tokens + self._live_out_tokens + t.append(f" ↑ {self._input_tokens:,}\n", style=_C_DIM) + t.append(f" ↓ {out:,}\n", style=_C_DIM) + t.append("\n") + t.append(" COMMANDS\n", style=f"bold {_C_PRIMARY}") + t.append("\n") + t.append(" quit / exit\n", style=_C_DIM) + widget.update(t) + return + t.append(" TASKS\n", style=f"bold {_C_PRIMARY}") t.append("\n") @@ -1011,6 +1196,22 @@ FooterKey .footer-key--key { widget = self.query_one("#task-header", Static) t = Text() + if self._is_conversational: + if self._status == "failed": + t.append("✘ ", style=f"bold {_C_RED}") + t.append("Failed", style=f"bold {_C_RED}") + if self._error: + t.append(f"\n{self._error[:120]}", style=_C_RED) + elif self._conversation_turn_in_progress: + t.append(f"{self._spinner()} ", style=_C_PRIMARY) + t.append("Flow is responding", style=f"bold {_C_PRIMARY}") + else: + t.append("● ", style=f"bold {_C_GREEN}") + t.append("Conversational flow ready", style=f"bold {_C_GREEN}") + t.append(" Type a message below", style=_C_DIM) + widget.update(t) + return + if self._status == "completed": elapsed = self._elapsed_frozen or (time.time() - self._start_time) t.append("✔ ", style=f"bold {_C_GREEN}") @@ -1062,6 +1263,41 @@ FooterKey .footer-key--key { t = Text() should_scroll = False + if self._is_conversational: + if not self._conversation_messages and not self._is_streaming: + t.append(" Start the conversation below.\n", style=_C_MUTED) + for role, content in self._conversation_messages: + if role == "user": + t.append("\n You\n", style=f"bold {_C_TEAL}") + else: + t.append("\n Assistant\n", style=f"bold {_C_PRIMARY}") + rendered = _format_json_in_text(_unescape_text(content)) + for line in rendered.split("\n"): + style = _C_TEXT if role == "assistant" else _C_DIM + t.append(f" {line}\n", style=style) + + if self._is_streaming and self._streaming_text: + text = _unescape_text(self._filtered_streaming_text()) + if text.strip(): + t.append("\n Assistant\n", style=f"bold {_C_PRIMARY}") + for line in text.rstrip().split("\n")[-40:]: + t.append(f" {line}\n", style=_C_TEXT) + should_scroll = True + + if self._status == "failed" and self._error: + t.append("\n Error\n", style=f"bold {_C_RED}") + t.append(f" {self._error}\n", style=_C_RED) + + widget.update(t) + if should_scroll: + try: + self.query_one("#scroll-area", VerticalScroll).scroll_end( + animate=False + ) + except Exception: # noqa: S110 + pass + return + # Plan section if self._plan and self._plan.get("steps"): plan_title = self._plan.get("plan", "Plan") diff --git a/lib/cli/src/crewai_cli/kickoff_flow.py b/lib/cli/src/crewai_cli/kickoff_flow.py new file mode 100644 index 000000000..bde1ddee7 --- /dev/null +++ b/lib/cli/src/crewai_cli/kickoff_flow.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import importlib +import inspect +from pathlib import Path +import subprocess +import sys +from typing import Any + +import click + + +def _project_script_target(script_name: str) -> str | None: + try: + from crewai_cli.utils import read_toml + + pyproject = read_toml() + except Exception: + return None + + target = pyproject.get("project", {}).get("scripts", {}).get(script_name) + return target if isinstance(target, str) else None + + +def _prepare_project_import_path() -> None: + cwd = Path.cwd() + for path in (cwd / "src", cwd): + path_str = str(path) + if path.exists() and path_str not in sys.path: + sys.path.insert(0, path_str) + + +def _load_conversational_flow_from_kickoff_script() -> Any | None: + target = _project_script_target("kickoff") + if not target or ":" not in target: + return None + + module_name, _callable_name = target.split(":", 1) + _prepare_project_import_path() + + try: + module = importlib.import_module(module_name) + from crewai.flow.flow import Flow + except Exception: + return None + + for value in vars(module).values(): + if ( + inspect.isclass(value) + and value is not Flow + and issubclass(value, Flow) + and getattr(value, "conversational", False) + ): + return value() + + for value in vars(module).values(): + if ( + isinstance(value, Flow) + and getattr(value, "conversational", False) + and callable(getattr(value, "handle_turn", None)) + ): + return value + + return None + + +def _run_conversational_flow_tui(flow: Any) -> Any: + from crewai_cli.crew_run_tui import CrewRunApp + + app = CrewRunApp( + crew_name=getattr(flow, "name", None) or type(flow).__name__, + conversational=True, + ) + app._flow = flow + app.run() + + if app._status == "failed": + raise SystemExit(1) + + return app._crew_result + + +def kickoff_flow() -> None: + """ + Kickoff the flow by running a command in the UV environment. + """ + flow = _load_conversational_flow_from_kickoff_script() + if flow is not None: + _run_conversational_flow_tui(flow) + return + + command = ["uv", "run", "kickoff"] + + try: + result = subprocess.run(command, capture_output=False, text=True, check=True) # noqa: S603 + + if result.stderr: + click.echo(result.stderr, err=True) + + except subprocess.CalledProcessError as e: + click.echo(f"An error occurred while running the flow: {e}", err=True) + click.echo(e.output, err=True) + + except Exception as e: + click.echo(f"An unexpected error occurred: {e}", err=True) diff --git a/lib/cli/src/crewai_cli/run_crew.py b/lib/cli/src/crewai_cli/run_crew.py index f9948a297..de6c8c412 100644 --- a/lib/cli/src/crewai_cli/run_crew.py +++ b/lib/cli/src/crewai_cli/run_crew.py @@ -604,6 +604,16 @@ def _run_flow_project( run_declarative_flow_in_project_env(definition=definition) return + from crewai_cli.kickoff_flow import ( + _load_conversational_flow_from_kickoff_script, + _run_conversational_flow_tui, + ) + + flow = _load_conversational_flow_from_kickoff_script() + if flow is not None: + _run_conversational_flow_tui(flow) + return + _execute_uv_script("kickoff", entity_type="flow") diff --git a/lib/cli/tests/test_crew_run_tui.py b/lib/cli/tests/test_crew_run_tui.py index 969bc5ae2..5c49dabf1 100644 --- a/lib/cli/tests/test_crew_run_tui.py +++ b/lib/cli/tests/test_crew_run_tui.py @@ -126,6 +126,52 @@ def test_chain_deploy_does_not_login_for_deploy_exit(monkeypatch, capsys) -> Non assert "Deploy failed with exit code 42" in capsys.readouterr().out +def test_conversation_turn_done_records_assistant_message() -> None: + class RawResult: + raw = "hello from the flow" + + app = CrewRunApp(conversational=True) + app._conversation_turn_in_progress = True + app._enable_conversation_input = lambda: None # type: ignore[method-assign] + app._tick = lambda: None # type: ignore[method-assign] + app._scroll_to_result = lambda: None # type: ignore[method-assign] + + app._on_conversation_turn_done(RawResult()) + + assert app._conversation_messages == [("assistant", "hello from the flow")] + assert app._conversation_turn_in_progress is False + assert app._status == "chatting" + assert isinstance(app._crew_result, RawResult) + + +@pytest.mark.asyncio +async def test_conversation_input_submits_turn() -> None: + class FakeFlow: + defer_trace_finalization = False + + def handle_turn(self, message: str) -> str: + return f"reply: {message}" + + def finalize_session_traces(self) -> None: + pass + + app = CrewRunApp(crew_name="Demo", conversational=True) + app._flow = FakeFlow() + + async with app.run_test() as pilot: + await pilot.click("#conversation-input") + await pilot.press("h", "i", "enter") + for _ in range(50): + await pilot.pause(0.05) + if app._conversation_messages[-1:] == [("assistant", "reply: hi")]: + break + + assert app._conversation_messages == [ + ("user", "hi"), + ("assistant", "reply: hi"), + ] + + def test_plan_step_status_updates_only_the_explicit_step() -> None: app = _app_with_plan() diff --git a/lib/cli/tests/test_kickoff_flow.py b/lib/cli/tests/test_kickoff_flow.py new file mode 100644 index 000000000..52eb299ee --- /dev/null +++ b/lib/cli/tests/test_kickoff_flow.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import sys + +from crewai_cli import kickoff_flow + + +def test_loads_conversational_flow_from_kickoff_script(tmp_path, monkeypatch) -> None: + package_dir = tmp_path / "src" / "demo_chat" + package_dir.mkdir(parents=True) + (package_dir / "__init__.py").write_text("") + (package_dir / "main.py").write_text( + "\n".join( + [ + "from crewai.flow import Flow", + "", + "class DemoChatFlow(Flow):", + " conversational = True", + ] + ) + ) + (tmp_path / "pyproject.toml").write_text( + "\n".join( + [ + "[project]", + 'name = "demo-chat"', + "[project.scripts]", + 'kickoff = "demo_chat.main:kickoff"', + ] + ) + ) + monkeypatch.chdir(tmp_path) + sys.modules.pop("demo_chat.main", None) + sys.modules.pop("demo_chat", None) + + flow = kickoff_flow._load_conversational_flow_from_kickoff_script() + + assert flow is not None + assert type(flow).__name__ == "DemoChatFlow" + assert flow.conversational is True + + +def test_kickoff_flow_falls_back_to_uv_when_no_conversational_flow( + monkeypatch, +) -> None: + calls: list[list[str]] = [] + + def fake_run(command, capture_output, text, check): + calls.append(command) + + class Result: + stderr = "" + + return Result() + + monkeypatch.setattr( + kickoff_flow, "_load_conversational_flow_from_kickoff_script", lambda: None + ) + monkeypatch.setattr(kickoff_flow.subprocess, "run", fake_run) + + kickoff_flow.kickoff_flow() + + assert calls == [["uv", "run", "kickoff"]] diff --git a/lib/cli/tests/test_run_crew.py b/lib/cli/tests/test_run_crew.py index c51fc16c5..fd9daf167 100644 --- a/lib/cli/tests/test_run_crew.py +++ b/lib/cli/tests/test_run_crew.py @@ -645,6 +645,10 @@ def test_run_crew_runs_python_flow_project(monkeypatch, capsys): "_execute_uv_script", lambda script_name, **kwargs: calls.append((script_name, kwargs)), ) + monkeypatch.setattr( + "crewai_cli.kickoff_flow._load_conversational_flow_from_kickoff_script", + lambda: None, + ) run_crew_module.run_crew() @@ -652,6 +656,41 @@ def test_run_crew_runs_python_flow_project(monkeypatch, capsys): assert calls == [("kickoff", {"entity_type": "flow"})] +def test_run_crew_runs_conversational_flow_tui(monkeypatch, capsys): + class Flow: + pass + + flow = Flow() + calls = [] + + monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False) + monkeypatch.setattr( + run_crew_module, + "read_toml", + lambda: {"tool": {"crewai": {"type": "flow"}}}, + ) + monkeypatch.setattr( + "crewai_cli.kickoff_flow._load_conversational_flow_from_kickoff_script", + lambda: flow, + ) + monkeypatch.setattr( + "crewai_cli.kickoff_flow._run_conversational_flow_tui", + lambda loaded_flow: calls.append(loaded_flow), + ) + monkeypatch.setattr( + run_crew_module, + "_execute_uv_script", + lambda *_args, **_kwargs: pytest.fail( + "conversational flows must use the TUI" + ), + ) + + run_crew_module.run_crew() + + assert capsys.readouterr().out == "" + assert calls == [flow] + + def test_run_crew_rejects_filename_for_flow_project(monkeypatch): monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False) monkeypatch.setattr(