mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-02 21:58:11 +00:00
* feat: update pyproject.toml to specify wheel targets Added a new section to the pyproject.toml file to include only specific files in the wheel build, enhancing the packaging process. Updated tests to verify the inclusion of these targets. * feat: add memory save event handling to activity log Implemented event handlers for MemorySaveStartedEvent, MemorySaveCompletedEvent, and MemorySaveFailedEvent in the crew_run_tui module. This allows the application to log memory save operations, capturing their status and details in the activity log. Added corresponding tests to verify the correct logging behavior for successful and failed memory saves. * feat: enhance memory save event handling in activity log Added functionality to suppress nested memory save events and updated the handling of MemorySaveStartedEvent, MemorySaveCompletedEvent, and MemorySaveFailedEvent to improve logging accuracy. Introduced new tests to verify the correct behavior of memory save events, including scenarios for nested events and completion updates for timed-out entries. * Fix memory save activity log handling * Normalize alpha package versions * Update scaffolded crew dependency * feat: add button to copy setup instructions for CrewAI coding agents Introduced a button in the documentation that allows users to easily copy setup instructions for CrewAI coding agents. The instructions include installation steps, environment setup, and best practices for using the CrewAI CLI. This enhancement aims to streamline the onboarding process for new users. * Improve missing CrewAI install guidance * fix: address pr review feedback * fix: avoid mismatched memory save rows * fix: wait for queued memory save events * fix: avoid matching memory saves on missing ids * chore: normalize prerelease version to 1.14.8a1
2345 lines
87 KiB
Python
2345 lines
87 KiB
Python
"""Full-screen Textual TUI for crew execution.
|
|
|
|
Two-column layout: left sidebar (tasks/agents/tokens) + main content
|
|
(task header, plan checklist, activity timeline, streaming output).
|
|
"""
|
|
|
|
import json as _json
|
|
import re
|
|
import threading
|
|
import time
|
|
from typing import Any, ClassVar, cast
|
|
|
|
from rich.text import Text
|
|
from textual import work
|
|
from textual.app import App, ComposeResult
|
|
from textual.binding import Binding, BindingType
|
|
from textual.containers import Horizontal, Vertical, VerticalScroll
|
|
from textual.css.query import NoMatches
|
|
from textual.screen import ModalScreen
|
|
from textual.widgets import Button, Footer, Header, Static
|
|
|
|
|
|
_SPINNER = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
|
|
|
|
# CrewAI brand palette
|
|
_C_PRIMARY = "#FF5A50" # crewai.primary (coral red)
|
|
_C_TEAL = "#1F7982" # crewai.secondary / tertiary
|
|
_C_GREEN = "#4aba6a" # success green (warm, not neon)
|
|
_C_RED = "#FF5A50" # error (same as primary)
|
|
_C_TEXT = "#e0e0e0" # light text on dark bg
|
|
_C_DIM = "#AAAAAA" # crewai.background-grey
|
|
_C_MUTED = "#666666" # dimmer than _C_DIM for past timeline
|
|
|
|
_STEP_NUMBER_RE = re.compile(r"\bstep\s+(\d+)\b", re.IGNORECASE)
|
|
_REFINEMENT_RE = re.compile(r"^\s*step\s+(\d+)\s*:\s*(.+)\s*$", re.IGNORECASE)
|
|
_INTERNAL_TOOL_NAMES = {"create_reasoning_plan"}
|
|
_LOG_ARGS_TEXT_LIMIT = 3_000
|
|
_LOG_RESULT_TEXT_LIMIT = 5_000
|
|
_LOG_TRUNCATION_SUFFIX = "... [truncated]"
|
|
# Background memory saves can emit their start event just after kickoff returns.
|
|
_MEMORY_SAVE_DRAIN_GRACE_SECONDS = 2.0
|
|
|
|
|
|
def _is_save_to_memory_tool(tool_name: str | None) -> bool:
|
|
return (tool_name or "").replace(" ", "_").lower() == "save_to_memory"
|
|
|
|
|
|
def _truncate_log_text(value: Any, limit: int) -> str | None:
|
|
if value is None:
|
|
return None
|
|
text = str(value)
|
|
if len(text) <= limit:
|
|
return text
|
|
suffix = _LOG_TRUNCATION_SUFFIX
|
|
return f"{text[: max(0, limit - len(suffix))]}{suffix}"
|
|
|
|
|
|
def _enable_tracing_in_dotenv() -> None:
|
|
"""Append CREWAI_TRACING_ENABLED=true to .env if not already set."""
|
|
from pathlib import Path
|
|
|
|
env_file = Path.cwd() / ".env"
|
|
key = "CREWAI_TRACING_ENABLED"
|
|
try:
|
|
if env_file.exists():
|
|
content = env_file.read_text()
|
|
if key in content:
|
|
return
|
|
sep = "" if content.endswith("\n") or not content else "\n"
|
|
env_file.write_text(f"{content}{sep}{key}=true\n")
|
|
else:
|
|
env_file.write_text(f"{key}=true\n")
|
|
except OSError:
|
|
# Persisting the tracing flag is best-effort; an unwritable .env
|
|
# must not block the run (tracing stays enabled for this session).
|
|
pass
|
|
|
|
|
|
def _unescape_text(s: str) -> str:
|
|
"""Replace literal backslash-n sequences with real newlines."""
|
|
return s.replace("\\n", "\n").replace("\\t", " ")
|
|
|
|
|
|
def _try_parse_structured(text: str) -> Any | None:
|
|
"""Try JSON first, then Python repr (single-quoted dicts/lists)."""
|
|
try:
|
|
return _json.loads(text)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
try:
|
|
import ast
|
|
|
|
obj = ast.literal_eval(text)
|
|
if isinstance(obj, (dict, list)):
|
|
return obj
|
|
except Exception: # noqa: S110
|
|
pass
|
|
return None
|
|
|
|
|
|
def _format_json_in_text(text: str) -> str:
|
|
"""Find JSON objects/arrays in text and pretty-print them."""
|
|
if not text or ("{" not in text and "[" not in text):
|
|
return text
|
|
|
|
result: list[str] = []
|
|
i = 0
|
|
while i < len(text):
|
|
if text[i] in ("{", "["):
|
|
close = "}" if text[i] == "{" else "]"
|
|
depth = 0
|
|
for j in range(i, len(text)):
|
|
if text[j] == text[i]:
|
|
depth += 1
|
|
elif text[j] == close:
|
|
depth -= 1
|
|
if depth == 0:
|
|
candidate = text[i : j + 1]
|
|
parsed = _try_parse_structured(candidate)
|
|
if parsed is not None:
|
|
formatted = _json.dumps(
|
|
parsed, indent=2, ensure_ascii=False
|
|
)
|
|
result.append(formatted)
|
|
i = j + 1
|
|
else:
|
|
result.append(text[i])
|
|
i += 1
|
|
break
|
|
else:
|
|
remaining = text[i:]
|
|
parsed = _try_parse_structured(remaining)
|
|
if parsed is not None:
|
|
result.append(_json.dumps(parsed, indent=2, ensure_ascii=False))
|
|
else:
|
|
result.append(remaining)
|
|
break
|
|
else:
|
|
result.append(text[i])
|
|
i += 1
|
|
|
|
return "".join(result)
|
|
|
|
|
|
def _colorize_json_line(t: Text, line: str) -> None:
|
|
"""Append a single line with soft JSON syntax highlighting."""
|
|
stripped = line.lstrip()
|
|
leading = line[: len(line) - len(stripped)]
|
|
t.append(leading, style=_C_MUTED)
|
|
if not stripped:
|
|
return
|
|
s = stripped
|
|
i = 0
|
|
while i < len(s):
|
|
ch = s[i]
|
|
if ch == '"':
|
|
j = i + 1
|
|
while j < len(s):
|
|
if s[j] == "\\":
|
|
j += 2
|
|
continue
|
|
if s[j] == '"':
|
|
j += 1
|
|
break
|
|
j += 1
|
|
token = s[i:j]
|
|
rest = s[j:].lstrip()
|
|
if rest.startswith(":"):
|
|
t.append(token, style=_C_TEAL)
|
|
else:
|
|
t.append(token, style=_C_DIM)
|
|
i = j
|
|
elif ch in "{}[],":
|
|
t.append(ch, style=_C_MUTED)
|
|
i += 1
|
|
elif ch == ":":
|
|
t.append(": ", style=_C_MUTED)
|
|
i += 1
|
|
if i < len(s) and s[i] == " ":
|
|
i += 1
|
|
elif ch in "-0123456789":
|
|
j = i + 1
|
|
while j < len(s) and s[j] in "0123456789.eE+-":
|
|
j += 1
|
|
t.append(s[i:j], style=_C_PRIMARY)
|
|
i = j
|
|
elif s[i : i + 4] == "true":
|
|
t.append("true", style=_C_GREEN)
|
|
i += 4
|
|
elif s[i : i + 5] == "false":
|
|
t.append("false", style=_C_GREEN)
|
|
i += 5
|
|
elif s[i : i + 4] == "null":
|
|
t.append("null", style=f"italic {_C_MUTED}")
|
|
i += 4
|
|
else:
|
|
t.append(ch, style=_C_DIM)
|
|
i += 1
|
|
|
|
|
|
def _append_highlighted(t: Text, content: str, indent: str, max_lines: int = 50) -> int:
|
|
"""Append text with JSON highlighting if it looks like JSON, else plain."""
|
|
lines = content.split("\n")
|
|
total = len(lines)
|
|
is_json = content.lstrip()[:1] in ("{", "[", '"')
|
|
for line in lines[:max_lines]:
|
|
t.append(f"{indent} ", style="")
|
|
if is_json:
|
|
_colorize_json_line(t, line)
|
|
else:
|
|
t.append(line, style=_C_DIM)
|
|
t.append("\n")
|
|
return total
|
|
|
|
|
|
class TraceConsentScreen(ModalScreen[bool]):
|
|
CSS = """
|
|
TraceConsentScreen {
|
|
align: center middle;
|
|
}
|
|
#consent-dialog {
|
|
width: 50;
|
|
height: auto;
|
|
max-height: 16;
|
|
background: #1c1c1c;
|
|
border: tall #333333;
|
|
padding: 1 2 2 2;
|
|
}
|
|
#consent-buttons {
|
|
height: 3;
|
|
margin-top: 1;
|
|
width: 100%;
|
|
layout: horizontal;
|
|
}
|
|
.consent-btn {
|
|
width: 1fr;
|
|
height: 3;
|
|
margin: 0 1;
|
|
}
|
|
#btn-consent-yes {
|
|
background: #1F7982;
|
|
color: #e0e0e0;
|
|
border: none;
|
|
text-style: bold;
|
|
}
|
|
#btn-consent-yes:hover {
|
|
background: #28969f;
|
|
}
|
|
#btn-consent-yes:disabled {
|
|
background: #1F7982;
|
|
color: #e0e0e0;
|
|
text-opacity: 100%;
|
|
opacity: 100%;
|
|
}
|
|
#btn-consent-no {
|
|
background: #333333;
|
|
color: #AAAAAA;
|
|
border: none;
|
|
}
|
|
#btn-consent-no:hover {
|
|
background: #444444;
|
|
}
|
|
"""
|
|
|
|
BINDINGS: ClassVar[list[BindingType]] = [
|
|
Binding("y", "consent_yes", "Yes", show=False),
|
|
Binding("n", "consent_no", "No", show=False),
|
|
Binding("escape", "consent_no", "Cancel", show=False),
|
|
]
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self._sending = False
|
|
self._frame = 0
|
|
self._spin_timer: Any = None
|
|
|
|
def compose(self) -> ComposeResult:
|
|
with Vertical(id="consent-dialog"):
|
|
yield Static(self._build_content(), id="consent-text")
|
|
with Horizontal(id="consent-buttons"):
|
|
yield Button("View Traces", id="btn-consent-yes", classes="consent-btn")
|
|
yield Button("Cancel", id="btn-consent-no", classes="consent-btn")
|
|
|
|
def _build_content(self) -> Text:
|
|
t = Text()
|
|
t.append(" View execution traces on CrewAI AMP\n\n", style=f"bold {_C_TEXT}")
|
|
t.append(" Sends agent decisions, tool calls, and\n", style=_C_DIM)
|
|
t.append(" timing data. Link expires in 24h.\n\n", style=_C_DIM)
|
|
t.append(" Traces will be enabled for future runs.\n", style=_C_MUTED)
|
|
return t
|
|
|
|
def _start_sending(self) -> None:
|
|
self._sending = True
|
|
btn_yes = self.query_one("#btn-consent-yes", Button)
|
|
btn_no = self.query_one("#btn-consent-no", Button)
|
|
btn_yes.disabled = True
|
|
btn_yes.label = f"{_SPINNER[0]} Loading…"
|
|
btn_no.display = False
|
|
self._spin_timer = self.set_interval(1 / 8, self._spin_tick)
|
|
cast("CrewRunApp", self.app)._on_trace_consent_accepted()
|
|
|
|
def _spin_tick(self) -> None:
|
|
self._frame += 1
|
|
try:
|
|
btn = self.query_one("#btn-consent-yes", Button)
|
|
btn.label = f"{_SPINNER[self._frame % len(_SPINNER)]} Loading…"
|
|
except Exception: # noqa: S110
|
|
pass
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if self._sending:
|
|
return
|
|
if event.button.id == "btn-consent-yes":
|
|
self._start_sending()
|
|
else:
|
|
self.dismiss(False)
|
|
|
|
def action_consent_yes(self) -> None:
|
|
if self._sending:
|
|
return
|
|
self._start_sending()
|
|
|
|
def action_consent_no(self) -> None:
|
|
if self._sending:
|
|
return
|
|
self.dismiss(False)
|
|
|
|
|
|
class CrewRunApp(App[Any]):
|
|
TITLE = "CrewAI"
|
|
|
|
CSS = """
|
|
Screen {
|
|
background: #131313;
|
|
}
|
|
|
|
#body {
|
|
height: 1fr;
|
|
}
|
|
|
|
#sidebar {
|
|
width: 34;
|
|
background: #1c1c1c;
|
|
border-right: vkey #333333;
|
|
scrollbar-size-vertical: 1;
|
|
scrollbar-color: #666666;
|
|
scrollbar-color-hover: #FF5A50;
|
|
scrollbar-background: #1c1c1c;
|
|
overflow-y: auto;
|
|
overflow-x: hidden;
|
|
}
|
|
|
|
#sidebar-content {
|
|
width: 100%;
|
|
height: auto;
|
|
padding: 1 0;
|
|
}
|
|
|
|
#main-panel {
|
|
width: 1fr;
|
|
}
|
|
|
|
#task-header {
|
|
height: auto;
|
|
max-height: 6;
|
|
padding: 1 2;
|
|
background: #1c1c1c;
|
|
border-bottom: hkey #333333;
|
|
}
|
|
|
|
#scroll-area {
|
|
height: 3fr;
|
|
min-height: 6;
|
|
scrollbar-size-vertical: 1;
|
|
scrollbar-color: #666666;
|
|
scrollbar-color-hover: #FF5A50;
|
|
scrollbar-background: #131313;
|
|
}
|
|
|
|
#main-content {
|
|
padding: 1 2;
|
|
height: auto;
|
|
}
|
|
|
|
Header {
|
|
background: #1c1c1c;
|
|
color: #FF5A50;
|
|
}
|
|
|
|
Footer {
|
|
background: #1c1c1c;
|
|
}
|
|
|
|
FooterKey {
|
|
background: #1c1c1c;
|
|
color: #AAAAAA;
|
|
}
|
|
|
|
FooterKey .footer-key--key {
|
|
background: #262626;
|
|
color: #FF5A50;
|
|
}
|
|
|
|
#log-panel {
|
|
height: 2fr;
|
|
min-height: 6;
|
|
background: #1c1c1c;
|
|
border-top: hkey #333333;
|
|
scrollbar-size-vertical: 1;
|
|
scrollbar-color: #666666;
|
|
scrollbar-color-hover: #FF5A50;
|
|
scrollbar-background: #1c1c1c;
|
|
}
|
|
|
|
#log-content {
|
|
padding: 1 2;
|
|
height: auto;
|
|
}
|
|
|
|
#sidebar-actions {
|
|
display: none;
|
|
height: auto;
|
|
padding: 0 1;
|
|
margin-top: 1;
|
|
border-top: hkey #333333;
|
|
}
|
|
|
|
.action-btn {
|
|
width: 100%;
|
|
min-width: 20;
|
|
height: 3;
|
|
margin: 1 1 0 1;
|
|
text-style: bold;
|
|
}
|
|
|
|
#btn-traces {
|
|
background: #1F7982;
|
|
color: #e0e0e0;
|
|
border: none;
|
|
}
|
|
#btn-traces:hover {
|
|
background: #28969f;
|
|
}
|
|
#btn-traces:disabled {
|
|
background: #1a4a50;
|
|
color: #888888;
|
|
}
|
|
|
|
#btn-deploy {
|
|
background: #333333;
|
|
color: #e0e0e0;
|
|
border: none;
|
|
}
|
|
#btn-deploy:hover {
|
|
background: #444444;
|
|
}
|
|
|
|
#btn-traces-done {
|
|
background: #1a3a3a;
|
|
color: #1F7982;
|
|
border: none;
|
|
}
|
|
#btn-traces-done:hover {
|
|
background: #1F7982;
|
|
color: #e0e0e0;
|
|
}
|
|
"""
|
|
|
|
BINDINGS: ClassVar[list[BindingType]] = [
|
|
Binding("q", "quit", "Quit"),
|
|
Binding("s", "toggle_sidebar", "Sidebar"),
|
|
Binding("l", "toggle_logs", "Logs"),
|
|
Binding("t", "view_traces", "Traces", show=False),
|
|
Binding("d", "deploy_crew", "Deploy", show=False),
|
|
Binding("down", "log_down", "Log ↓", show=False),
|
|
Binding("up", "log_up", "Log ↑", show=False),
|
|
Binding("enter", "log_toggle", "Expand", show=False),
|
|
]
|
|
|
|
def __init__(
|
|
self,
|
|
crew_name: str = "Crew",
|
|
total_tasks: int = 0,
|
|
agent_names: list[str] | None = None,
|
|
task_names: list[str] | None = None,
|
|
):
|
|
super().__init__()
|
|
self.title = f"CrewAI — {crew_name}"
|
|
self.sub_title = "0:00"
|
|
self._crew_name = crew_name
|
|
self._lock = threading.RLock()
|
|
|
|
self._total_tasks = total_tasks
|
|
self._current_task_idx = 0
|
|
self._current_task_desc = ""
|
|
self._current_agent = ""
|
|
self._task_names = task_names or []
|
|
self._agent_names = agent_names or []
|
|
self._task_statuses: dict[int, str] = {
|
|
i: "pending" for i in range(1, total_tasks + 1)
|
|
}
|
|
# Maps a task's identity to state captured when it started (sidebar
|
|
# index, description, agent, start time) so completion/failure events
|
|
# build their log entry from the right task even when tasks run
|
|
# async/overlapping.
|
|
self._task_state_by_key: dict[str, dict[str, Any]] = {}
|
|
|
|
self._timeline: list[tuple[str, str, str]] = []
|
|
self._current_step: tuple[str, str, str] | None = None
|
|
|
|
self._input_tokens = 0
|
|
self._output_tokens = 0
|
|
self._live_out_tokens = 0
|
|
self._pending_input_estimate = 0
|
|
self._llm_calls = 0
|
|
|
|
self._streaming_text = ""
|
|
self._is_streaming = False
|
|
self._current_llm_text = ""
|
|
self._task_full_output = ""
|
|
|
|
self._plan: dict[str, Any] | None = None
|
|
self._plan_step_status: dict[int, str] = {}
|
|
self._awaiting_replan = False
|
|
|
|
self._status = "starting"
|
|
self._start_time = time.time()
|
|
self._task_start_time = time.time()
|
|
self._final_output: str | None = None
|
|
self._error: str | None = None
|
|
self._frame = 0
|
|
|
|
self._task_logs: list[dict[str, Any]] = []
|
|
self._current_task_steps: list[dict[str, Any]] = []
|
|
|
|
self._log_entries: list[dict[str, Any]] = []
|
|
self._log_cursor: int = 0
|
|
self._log_expanded: set[int] = set()
|
|
self._log_scroll_needed: bool = False
|
|
self._log_line_map: list[tuple[int, int, int]] = []
|
|
self._suppressed_memory_save_event_ids: set[str] = set()
|
|
self._memory_save_drain_timer: Any = None
|
|
|
|
self._event_handlers: list[tuple[type, Any]] = []
|
|
|
|
self._crew: Any = None
|
|
self._default_inputs: dict[str, Any] | None = None
|
|
self._crew_result: Any = None
|
|
self._crew_json_path: Any = None
|
|
self._elapsed_frozen: float | None = None
|
|
self._want_deploy: bool = False
|
|
self._trace_url: str | None = None
|
|
self._consent_screen: TraceConsentScreen | None = None
|
|
|
|
# ── Layout ──────────────────────────────────────────────
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header(show_clock=False)
|
|
with Horizontal(id="body"):
|
|
with VerticalScroll(id="sidebar"):
|
|
yield Static(id="sidebar-content")
|
|
with Vertical(id="sidebar-actions"):
|
|
yield Button("View Traces", id="btn-traces", classes="action-btn")
|
|
yield Button("Deploy", id="btn-deploy", classes="action-btn")
|
|
with Vertical(id="main-panel"):
|
|
yield Static(id="task-header")
|
|
with VerticalScroll(id="scroll-area"):
|
|
yield Static(id="main-content")
|
|
with VerticalScroll(id="log-panel"):
|
|
yield Static(id="log-content")
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
self._start_time = time.time()
|
|
self._subscribe()
|
|
self._tick_timer = self.set_interval(1 / 8, self._tick)
|
|
if self._crew:
|
|
self._run_crew_worker()
|
|
elif self._crew_json_path:
|
|
self._load_and_run_worker()
|
|
|
|
# ── Crew execution ──────────────────────────────────────
|
|
|
|
@work(thread=True, exclusive=True, group="crew")
|
|
def _load_and_run_worker(self) -> None:
|
|
from crewai.events.listeners.tracing.utils import (
|
|
set_suppress_tracing_messages,
|
|
set_tui_mode,
|
|
)
|
|
|
|
set_tui_mode(True)
|
|
set_suppress_tracing_messages(True)
|
|
try:
|
|
from crewai.project.crew_loader import load_crew
|
|
|
|
crew, default_inputs = load_crew(self._crew_json_path)
|
|
crew.verbose = False
|
|
for agent in crew.agents:
|
|
agent.verbose = False
|
|
if hasattr(agent, "llm") and hasattr(agent.llm, "stream"):
|
|
agent.llm.stream = True
|
|
|
|
task_names = []
|
|
for task in crew.tasks:
|
|
name = getattr(task, "name", "") or ""
|
|
if not name:
|
|
desc = getattr(task, "description", "") or "Task"
|
|
name = desc[:40]
|
|
task_names.append(name)
|
|
|
|
agent_names = []
|
|
for agent in crew.agents:
|
|
name = (
|
|
getattr(agent, "role", "") or getattr(agent, "name", "") or "Agent"
|
|
)
|
|
agent_names.append(name)
|
|
|
|
self._crew = crew
|
|
self._default_inputs = default_inputs
|
|
|
|
def _apply_crew_info() -> None:
|
|
with self._lock:
|
|
self._total_tasks = len(crew.tasks)
|
|
self._task_names = task_names
|
|
self._agent_names = agent_names
|
|
self._task_statuses = {
|
|
i: "pending" for i in range(1, len(crew.tasks) + 1)
|
|
}
|
|
self.title = f"CrewAI — {crew.name or 'Crew'}"
|
|
self._crew_name = crew.name or "Crew"
|
|
self._start_time = time.time()
|
|
self._run_crew_worker()
|
|
|
|
self.call_from_thread(_apply_crew_info)
|
|
except Exception as e:
|
|
self.call_from_thread(self._on_crew_failed, str(e))
|
|
|
|
@work(thread=True, exclusive=True, group="crew")
|
|
def _run_crew_worker(self) -> None:
|
|
from crewai.events.listeners.tracing.utils import (
|
|
set_suppress_tracing_messages,
|
|
set_tui_mode,
|
|
)
|
|
|
|
set_tui_mode(True)
|
|
set_suppress_tracing_messages(True)
|
|
try:
|
|
result = self._crew.kickoff(inputs=self._default_inputs)
|
|
output = result.raw if result and hasattr(result, "raw") else None
|
|
with self._lock:
|
|
self._crew_result = result
|
|
self.call_from_thread(self._on_crew_done, output)
|
|
except Exception as e:
|
|
self.call_from_thread(self._on_crew_failed, str(e))
|
|
|
|
def _on_crew_done(self, output: str | None) -> None:
|
|
with self._lock:
|
|
self._status = "completed"
|
|
self._final_output = output
|
|
self._is_streaming = False
|
|
self._streaming_text = ""
|
|
self._current_step = None
|
|
self._timeline = []
|
|
self._elapsed_frozen = time.time() - self._start_time
|
|
self._collapse_plan_on_task_done()
|
|
for k in self._task_statuses:
|
|
if self._task_statuses[k] == "active":
|
|
self._task_statuses[k] = "done"
|
|
now = time.time()
|
|
for entry in self._log_entries:
|
|
if entry["status"] == "running":
|
|
if entry["tool_name"] == "memory_save":
|
|
continue
|
|
entry["status"] = "timeout"
|
|
entry["error"] = "No result received before crew completed"
|
|
entry["duration"] = now - entry["start_time"]
|
|
try:
|
|
from crewai.events.listeners.tracing.trace_listener import (
|
|
TraceCollectionListener,
|
|
)
|
|
|
|
listener: TraceCollectionListener | None = getattr(
|
|
TraceCollectionListener, "_instance", None
|
|
)
|
|
if listener and listener.batch_manager:
|
|
bm = listener.batch_manager
|
|
self._trace_url = (
|
|
getattr(bm, "trace_url", None) or bm.ephemeral_trace_url
|
|
)
|
|
except Exception: # noqa: S110
|
|
pass
|
|
try:
|
|
self.query_one("#sidebar-actions").display = True
|
|
if self._trace_url:
|
|
btn = self.query_one("#btn-traces", Button)
|
|
btn.label = "✔ Open Traces"
|
|
btn.id = "btn-traces-done"
|
|
except Exception: # noqa: S110
|
|
pass
|
|
self._tick()
|
|
self._scroll_to_result()
|
|
self.call_later(self._focus_activity_log)
|
|
self._tick_timer.stop()
|
|
self._tick_timer = self.set_interval(1 / 2, self._tick)
|
|
self._unsubscribe_if_no_running_memory_save(wait_for_queued=True)
|
|
|
|
def _on_crew_failed(self, error: str) -> None:
|
|
with self._lock:
|
|
self._status = "failed"
|
|
self._error = error
|
|
self._is_streaming = False
|
|
self._current_step = None
|
|
self._elapsed_frozen = time.time() - self._start_time
|
|
now = time.time()
|
|
for entry in self._log_entries:
|
|
if entry["status"] == "running":
|
|
if entry["tool_name"] == "memory_save":
|
|
continue
|
|
entry["status"] = "error"
|
|
entry["error"] = "No result received before crew failed"
|
|
entry["duration"] = now - entry["start_time"]
|
|
self._tick()
|
|
self.call_later(self._focus_activity_log)
|
|
self._tick_timer.stop()
|
|
self._tick_timer = self.set_interval(1 / 2, self._tick)
|
|
self._unsubscribe_if_no_running_memory_save(wait_for_queued=True)
|
|
|
|
# ── Actions ─────────────────────────────────────────────
|
|
|
|
def action_toggle_sidebar(self) -> None:
|
|
sidebar = self.query_one("#sidebar")
|
|
sidebar.display = not sidebar.display
|
|
|
|
def action_toggle_logs(self) -> None:
|
|
panel = self.query_one("#log-panel")
|
|
panel.display = not panel.display
|
|
|
|
def action_log_down(self) -> None:
|
|
try:
|
|
if not self.query_one("#log-panel").display:
|
|
return
|
|
except Exception:
|
|
return
|
|
should_refresh = False
|
|
with self._lock:
|
|
if self._log_entries:
|
|
self._log_cursor = min(self._log_cursor + 1, len(self._log_entries) - 1)
|
|
self._log_scroll_needed = True
|
|
should_refresh = True
|
|
if should_refresh:
|
|
self._refresh_log_panel()
|
|
|
|
def action_log_up(self) -> None:
|
|
try:
|
|
if not self.query_one("#log-panel").display:
|
|
return
|
|
except Exception:
|
|
return
|
|
should_refresh = False
|
|
with self._lock:
|
|
if self._log_entries:
|
|
self._log_cursor = max(self._log_cursor - 1, 0)
|
|
self._log_scroll_needed = True
|
|
should_refresh = True
|
|
if should_refresh:
|
|
self._refresh_log_panel()
|
|
|
|
def action_log_toggle(self) -> None:
|
|
try:
|
|
if not self.query_one("#log-panel").display:
|
|
return
|
|
except Exception:
|
|
return
|
|
should_refresh = False
|
|
with self._lock:
|
|
if self._log_entries:
|
|
if self._log_cursor in self._log_expanded:
|
|
self._log_expanded.discard(self._log_cursor)
|
|
else:
|
|
self._log_expanded.add(self._log_cursor)
|
|
should_refresh = True
|
|
if should_refresh:
|
|
self._refresh_log_panel()
|
|
|
|
async def action_quit(self) -> None:
|
|
self._unsubscribe()
|
|
self.exit(self._crew_result)
|
|
|
|
def action_view_traces(self) -> None:
|
|
if self._status != "completed":
|
|
return
|
|
if self._trace_url:
|
|
import webbrowser
|
|
|
|
try:
|
|
webbrowser.open(self._trace_url)
|
|
except Exception: # noqa: S110
|
|
pass
|
|
return
|
|
self._consent_screen = TraceConsentScreen()
|
|
self.push_screen(self._consent_screen)
|
|
|
|
def _on_trace_consent_accepted(self) -> None:
|
|
self._send_traces_worker()
|
|
|
|
@work(thread=True)
|
|
def _send_traces_worker(self) -> None:
|
|
import webbrowser
|
|
|
|
try:
|
|
from crewai.events.listeners.tracing.utils import (
|
|
set_suppress_tracing_messages,
|
|
set_tui_mode,
|
|
)
|
|
|
|
set_tui_mode(True)
|
|
set_suppress_tracing_messages(True)
|
|
|
|
from crewai.events.listeners.tracing.trace_listener import (
|
|
TraceCollectionListener,
|
|
)
|
|
from crewai.events.listeners.tracing.utils import (
|
|
mark_first_execution_completed,
|
|
)
|
|
|
|
listener: TraceCollectionListener | None = getattr(
|
|
TraceCollectionListener, "_instance", None
|
|
)
|
|
if not listener:
|
|
self.call_from_thread(self._dismiss_consent_modal)
|
|
return
|
|
|
|
bm = listener.batch_manager
|
|
url = getattr(bm, "trace_url", None) or bm.ephemeral_trace_url
|
|
|
|
if not url:
|
|
handler = listener.first_time_handler
|
|
handler.set_batch_manager(bm)
|
|
handler._initialize_backend_and_send_events()
|
|
url = handler.ephemeral_url or bm.ephemeral_trace_url
|
|
|
|
if listener.first_time_handler.is_first_time:
|
|
mark_first_execution_completed(user_consented=True)
|
|
|
|
_enable_tracing_in_dotenv()
|
|
|
|
if url:
|
|
self._trace_url = url
|
|
|
|
def _done() -> None:
|
|
self._dismiss_consent_modal()
|
|
try:
|
|
btn = self.query_one("#btn-traces", Button)
|
|
btn.label = "✔ Open Traces"
|
|
btn.id = "btn-traces-done"
|
|
except Exception: # noqa: S110
|
|
pass
|
|
|
|
self.call_from_thread(_done)
|
|
try:
|
|
webbrowser.open(url)
|
|
except Exception: # noqa: S110
|
|
pass
|
|
else:
|
|
self.call_from_thread(self._dismiss_consent_modal)
|
|
except Exception:
|
|
self.call_from_thread(self._dismiss_consent_modal)
|
|
|
|
def _dismiss_consent_modal(self) -> None:
|
|
try:
|
|
screen = self._consent_screen
|
|
if screen and screen.is_attached:
|
|
screen.dismiss(False)
|
|
except Exception: # noqa: S110
|
|
pass
|
|
|
|
def action_deploy_crew(self) -> None:
|
|
if self._status != "completed":
|
|
return
|
|
self._want_deploy = True
|
|
self._unsubscribe()
|
|
self.exit(self._crew_result)
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id in ("btn-traces", "btn-traces-done"):
|
|
self.action_view_traces()
|
|
elif event.button.id == "btn-deploy":
|
|
self.action_deploy_crew()
|
|
|
|
def _scroll_to_result(self) -> None:
|
|
try:
|
|
scroll = self.query_one("#scroll-area", VerticalScroll)
|
|
self.call_later(lambda: scroll.scroll_end(animate=False))
|
|
except Exception: # noqa: S110
|
|
pass
|
|
|
|
def _focus_activity_log(self) -> None:
|
|
if not self._is_mounted:
|
|
return
|
|
log_panel = self.query_one("#log-panel", VerticalScroll)
|
|
if log_panel.display:
|
|
log_panel.focus()
|
|
|
|
def _refresh_log_panel(self) -> None:
|
|
if not self._is_mounted:
|
|
return
|
|
with self._lock:
|
|
if self.query_one("#log-panel").display:
|
|
self._render_log_panel()
|
|
|
|
def on_click(self, event: Any) -> None:
|
|
try:
|
|
widget = self.query_one("#log-content", Static)
|
|
except Exception:
|
|
return
|
|
if not widget.region.contains(event.screen_x, event.screen_y):
|
|
return
|
|
scroll = self.query_one("#log-panel", VerticalScroll)
|
|
clicked_line = event.screen_y - widget.region.y + int(scroll.scroll_y)
|
|
with self._lock:
|
|
for idx, start, end in self._log_line_map:
|
|
if start <= clicked_line < end:
|
|
self._log_cursor = idx
|
|
if idx in self._log_expanded:
|
|
self._log_expanded.discard(idx)
|
|
else:
|
|
self._log_expanded.add(idx)
|
|
break
|
|
self._refresh_log_panel()
|
|
|
|
# ── Tick (8 fps) ────────────────────────────────────────
|
|
|
|
def _tick(self) -> None:
|
|
self._frame += 1
|
|
elapsed = getattr(self, "_elapsed_frozen", None) or (
|
|
time.time() - self._start_time
|
|
)
|
|
mins, secs = divmod(int(elapsed), 60)
|
|
self.sub_title = f"{mins}:{secs:02d}"
|
|
|
|
try:
|
|
with self._lock:
|
|
self._render_sidebar()
|
|
self._render_task_header()
|
|
self._render_main_content()
|
|
if self.query_one("#log-panel").display:
|
|
self._render_log_panel()
|
|
except NoMatches:
|
|
return
|
|
|
|
def _spinner(self) -> str:
|
|
return _SPINNER[self._frame % len(_SPINNER)]
|
|
|
|
# ── Sidebar rendering ───────────────────────────────────
|
|
|
|
def _render_sidebar(self) -> None:
|
|
widget = self.query_one("#sidebar-content", Static)
|
|
t = Text()
|
|
sidebar_width = 30
|
|
|
|
t.append(" TASKS\n", style=f"bold {_C_PRIMARY}")
|
|
t.append("\n")
|
|
|
|
for i in range(1, self._total_tasks + 1):
|
|
status = self._task_statuses.get(i, "pending")
|
|
name = (
|
|
self._task_names[i - 1] if i <= len(self._task_names) else f"Task {i}"
|
|
)
|
|
max_name = sidebar_width - 6
|
|
if len(name) > max_name:
|
|
name = name[: max_name - 1] + "…"
|
|
|
|
if status == "done":
|
|
t.append(" ✔ ", style=_C_GREEN)
|
|
t.append(f"{name}\n", style=_C_DIM)
|
|
elif status == "active":
|
|
t.append(f" {self._spinner()} ", style=_C_PRIMARY)
|
|
t.append(f"{name}\n", style=f"bold {_C_TEXT}")
|
|
elif status == "failed":
|
|
t.append(" ✘ ", style=_C_RED)
|
|
t.append(f"{name}\n", style=_C_RED)
|
|
else:
|
|
t.append(" ○ ", style=_C_DIM)
|
|
t.append(f"{name}\n", style=_C_DIM)
|
|
|
|
t.append("\n")
|
|
t.append(" AGENTS\n", style=f"bold {_C_PRIMARY}")
|
|
t.append("\n")
|
|
|
|
for name in self._agent_names:
|
|
max_name = sidebar_width - 6
|
|
disp = name[: max_name - 1] + "…" if len(name) > max_name else name
|
|
if name == self._current_agent:
|
|
t.append(f" ● {disp}\n", style=f"bold {_C_PRIMARY}")
|
|
else:
|
|
t.append(f" {disp}\n", style=_C_DIM)
|
|
|
|
t.append("\n")
|
|
t.append(" TOKENS\n", style=f"bold {_C_PRIMARY}")
|
|
t.append("\n")
|
|
|
|
out = self._output_tokens + self._live_out_tokens
|
|
t.append(f" ↑ {self._input_tokens:,}\n", style=_C_DIM)
|
|
t.append(f" ↓ {out:,}\n", style=_C_DIM)
|
|
|
|
widget.update(t)
|
|
|
|
# ── Task header rendering ───────────────────────────────
|
|
|
|
def _render_task_header(self) -> None:
|
|
widget = self.query_one("#task-header", Static)
|
|
t = Text()
|
|
|
|
if self._status == "completed":
|
|
elapsed = self._elapsed_frozen or (time.time() - self._start_time)
|
|
t.append("✔ ", style=f"bold {_C_GREEN}")
|
|
t.append(f"Completed {self._total_tasks} tasks", style=f"bold {_C_GREEN}")
|
|
t.append(f" {elapsed:.1f}s", style=_C_DIM)
|
|
|
|
out = self._output_tokens + self._live_out_tokens
|
|
parts = []
|
|
if self._input_tokens:
|
|
parts.append(f"↑{self._input_tokens:,}")
|
|
if out:
|
|
parts.append(f"↓{out:,}")
|
|
if parts:
|
|
t.append(f" {' '.join(parts)} tokens", style=_C_DIM)
|
|
|
|
elif self._status == "failed":
|
|
t.append("✘ ", style=f"bold {_C_RED}")
|
|
t.append("Failed", style=f"bold {_C_RED}")
|
|
if self._error:
|
|
t.append(f"\n{self._error[:120]}", style=_C_RED)
|
|
|
|
elif self._current_task_idx > 0:
|
|
t.append(
|
|
f"Task {self._current_task_idx}/{self._total_tasks}",
|
|
style=f"bold {_C_PRIMARY}",
|
|
)
|
|
if self._current_task_desc:
|
|
desc = self._current_task_desc
|
|
if len(desc) > 80:
|
|
desc = desc[:79] + "…"
|
|
t.append(f" — {desc}", style=_C_TEXT)
|
|
if self._current_agent:
|
|
t.append("\nAgent: ", style=_C_DIM)
|
|
t.append(self._current_agent, style=f"bold {_C_TEXT}")
|
|
|
|
else:
|
|
t.append(f"{self._spinner()} ", style=_C_PRIMARY)
|
|
if not self._crew:
|
|
t.append("Loading crew…", style=_C_DIM)
|
|
else:
|
|
t.append("Starting crew…", style=_C_DIM)
|
|
|
|
widget.update(t)
|
|
|
|
# ── Main content rendering ──────────────────────────────
|
|
|
|
def _render_main_content(self) -> None:
|
|
widget = self.query_one("#main-content", Static)
|
|
t = Text()
|
|
should_scroll = False
|
|
|
|
# Plan section
|
|
if self._plan and self._plan.get("steps"):
|
|
plan_title = self._plan.get("plan", "Plan")
|
|
completed = self._status == "completed" and all(
|
|
self._plan_step_status.get(step.get("step_number")) == "done"
|
|
for step in self._plan["steps"]
|
|
)
|
|
if completed:
|
|
total = len(self._plan["steps"])
|
|
t.append(" PLAN ", style=f"bold {_C_MUTED}")
|
|
t.append(f"✔ {total} steps completed\n\n", style=_C_MUTED)
|
|
else:
|
|
t.append(" PLAN\n", style=f"bold {_C_MUTED}")
|
|
t.append(" ▸ ", style=f"bold {_C_TEAL}")
|
|
t.append(f"{plan_title[:80]}\n", style=f"bold {_C_TEAL}")
|
|
t.append("\n")
|
|
|
|
for step in self._plan["steps"]:
|
|
sn = step.get("step_number", 0)
|
|
desc = step.get("description", "")
|
|
short = desc[:90]
|
|
if len(desc) > 90:
|
|
short += "…"
|
|
|
|
st = self._plan_step_status.get(sn, "pending")
|
|
if st == "done":
|
|
t.append(" ✔ ", style=_C_GREEN)
|
|
t.append(f"{sn}. {short}\n", style=_C_MUTED)
|
|
elif st == "failed":
|
|
t.append(" ✘ ", style=_C_RED)
|
|
t.append(f"{sn}. {short}\n", style=_C_RED)
|
|
elif st == "active":
|
|
t.append(f" {self._spinner()} ", style=_C_PRIMARY)
|
|
t.append(f"{sn}. {short}\n", style=_C_TEXT)
|
|
else:
|
|
t.append(" ○ ", style=_C_MUTED)
|
|
t.append(f"{sn}. {short}\n", style=_C_MUTED)
|
|
t.append("\n")
|
|
|
|
# Current activity indicator
|
|
if self._current_step:
|
|
sty, msg, _detail = self._current_step
|
|
if sty == "yellow":
|
|
t.append(f" {self._spinner()} ", style=_C_PRIMARY)
|
|
t.append(f"{msg}\n\n", style=_C_DIM)
|
|
elif sty == "teal":
|
|
t.append(f" {self._spinner()} ", style=_C_TEAL)
|
|
t.append(f"{msg}\n\n", style=_C_TEAL)
|
|
|
|
# Streaming output
|
|
if self._is_streaming and self._streaming_text:
|
|
text = self._filtered_streaming_text()
|
|
text = _unescape_text(text)
|
|
if text.strip():
|
|
lines = text.rstrip().split("\n")
|
|
for line in lines[-40:]:
|
|
t.append(f" {line}\n", style=_C_TEXT)
|
|
should_scroll = True
|
|
|
|
# Final output
|
|
if self._status == "completed" and self._final_output:
|
|
t.append("\n")
|
|
t.append(" ━━━ Result ━━━\n\n", style=f"bold {_C_TEAL}")
|
|
output = _unescape_text(self._final_output)
|
|
output = _format_json_in_text(output)
|
|
is_json = output.lstrip()[:1] in ("{", "[", '"')
|
|
for line in output.split("\n"):
|
|
t.append(" ")
|
|
if is_json:
|
|
_colorize_json_line(t, line)
|
|
else:
|
|
t.append(line, style=_C_TEXT)
|
|
t.append("\n")
|
|
|
|
widget.update(t)
|
|
|
|
if should_scroll:
|
|
try:
|
|
scroll = self.query_one("#scroll-area", VerticalScroll)
|
|
if (
|
|
scroll.max_scroll_y <= 0
|
|
or scroll.scroll_y >= scroll.max_scroll_y - 50
|
|
):
|
|
scroll.scroll_end(animate=False)
|
|
except Exception: # noqa: S110
|
|
pass
|
|
|
|
# ── Log panel rendering ──────────────────────────────────
|
|
|
|
def _render_log_panel(self) -> None:
|
|
widget = self.query_one("#log-content", Static)
|
|
t = Text()
|
|
t.append(" ACTIVITY LOG", style=f"bold {_C_PRIMARY}")
|
|
t.append(" ↑↓ navigate enter expand/collapse\n", style=_C_MUTED)
|
|
|
|
if not self._log_entries:
|
|
t.append("\n No activity yet.\n", style=_C_MUTED)
|
|
widget.update(t)
|
|
return
|
|
|
|
if self._log_cursor >= len(self._log_entries):
|
|
self._log_cursor = len(self._log_entries) - 1
|
|
|
|
cursor_line = 0
|
|
line_map: list[tuple[int, int, int]] = []
|
|
now = time.time()
|
|
for i, entry in enumerate(self._log_entries):
|
|
entry_start_line = t.plain.count("\n")
|
|
name = entry["tool_name"]
|
|
status = entry["status"]
|
|
focused = i == self._log_cursor
|
|
expanded = i in self._log_expanded
|
|
if focused:
|
|
cursor_line = entry_start_line
|
|
|
|
if status == "running" and (now - entry["start_time"]) > 120:
|
|
entry["status"] = "timeout"
|
|
entry["error"] = "No response received (timeout)"
|
|
entry["duration"] = now - entry["start_time"]
|
|
status = "timeout"
|
|
self._log_expanded.add(i)
|
|
|
|
arrow = "▾" if expanded else "▸"
|
|
|
|
if focused:
|
|
t.append("\n")
|
|
t.append(" > ", style=_C_PRIMARY)
|
|
else:
|
|
t.append("\n ", style="")
|
|
|
|
if status == "running":
|
|
elapsed = now - entry["start_time"]
|
|
t.append(f"{arrow} ", style=_C_MUTED)
|
|
t.append(f"{self._spinner()} ", style=_C_PRIMARY)
|
|
t.append(f"{name}", style=f"bold {_C_TEXT}" if focused else _C_TEXT)
|
|
t.append(f" {elapsed:.0f}s\n", style=_C_MUTED)
|
|
elif status == "success":
|
|
t.append(f"{arrow} ", style=_C_MUTED)
|
|
t.append("✔ ", style=_C_GREEN)
|
|
t.append(f"{name}", style=f"bold {_C_TEXT}" if focused else _C_DIM)
|
|
if entry.get("from_cache"):
|
|
t.append(" cached\n", style=_C_TEAL)
|
|
else:
|
|
t.append(f" {entry['duration']:.1f}s\n", style=_C_MUTED)
|
|
elif status in ("error", "timeout"):
|
|
t.append(f"{arrow} ", style=_C_MUTED)
|
|
t.append("✘ ", style=_C_RED)
|
|
t.append(f"{name}", style=f"bold {_C_RED}")
|
|
dur = f" {entry['duration']:.1f}s" if entry.get("duration") else ""
|
|
t.append(f"{dur}\n", style=_C_MUTED)
|
|
|
|
if not expanded:
|
|
continue
|
|
|
|
indent = " "
|
|
if entry.get("args"):
|
|
t.append(f"{indent}Args:\n", style=_C_MUTED)
|
|
try:
|
|
parsed = _json.loads(entry["args"])
|
|
formatted = _json.dumps(parsed, indent=2, ensure_ascii=False)
|
|
except (ValueError, TypeError):
|
|
formatted = entry["args"]
|
|
_append_highlighted(t, formatted, indent)
|
|
|
|
if status in ("error", "timeout") and entry.get("error"):
|
|
t.append(f"{indent}Error:\n", style=_C_RED)
|
|
for line in str(entry["error"]).split("\n"):
|
|
if line.strip():
|
|
t.append(f"{indent} {line}\n", style=_C_RED)
|
|
|
|
if status == "success" and entry.get("result"):
|
|
t.append(f"{indent}Result:\n", style=_C_TEAL)
|
|
result_text = _unescape_text(str(entry["result"]))
|
|
result_text = _format_json_in_text(result_text)
|
|
total = _append_highlighted(t, result_text, indent)
|
|
if total > 50:
|
|
t.append(f"{indent} … ({total} lines total)\n", style=_C_MUTED)
|
|
|
|
line_map.append((i, entry_start_line, t.plain.count("\n")))
|
|
|
|
self._log_line_map = line_map
|
|
widget.update(t)
|
|
|
|
if self._log_scroll_needed:
|
|
self._log_scroll_needed = False
|
|
try:
|
|
log_scroll = self.query_one("#log-panel", VerticalScroll)
|
|
panel_h = log_scroll.size.height
|
|
cursor_top = cursor_line
|
|
cursor_bottom = cursor_line + 2
|
|
for _idx, _start, _end in self._log_line_map:
|
|
if _idx == self._log_cursor:
|
|
cursor_bottom = _end
|
|
break
|
|
visible_top = int(log_scroll.scroll_y)
|
|
visible_bottom = visible_top + panel_h
|
|
if cursor_top < visible_top + 1:
|
|
log_scroll.scroll_to(y=max(0, cursor_top - 1), animate=False)
|
|
elif cursor_bottom > visible_bottom - 1:
|
|
log_scroll.scroll_to(
|
|
y=max(0, cursor_bottom - panel_h + 1), animate=False
|
|
)
|
|
except Exception: # noqa: S110
|
|
pass
|
|
|
|
def _filtered_streaming_text(self) -> str:
|
|
if not self._streaming_text:
|
|
return ""
|
|
text = self._streaming_text
|
|
|
|
# Strip plan JSON — both complete (already parsed) and in-progress
|
|
plan_start = text.find('{"plan"')
|
|
if plan_start >= 0:
|
|
depth = 0
|
|
for i in range(plan_start, len(text)):
|
|
if text[i] == "{":
|
|
depth += 1
|
|
elif text[i] == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
text = (text[:plan_start] + text[i + 1 :]).strip()
|
|
break
|
|
else:
|
|
# Incomplete JSON — hide the partial blob
|
|
text = text[:plan_start].strip()
|
|
|
|
text = self._strip_step_observation_json(text)
|
|
return _format_json_in_text(text)
|
|
|
|
def _strip_step_observation_json(self, text: str) -> str:
|
|
"""Hide structured step-observation JSON from the live transcript."""
|
|
if "step_completed_successfully" not in text:
|
|
return text
|
|
|
|
result: list[str] = []
|
|
decoder = _json.JSONDecoder()
|
|
i = 0
|
|
while i < len(text):
|
|
start = text.find("{", i)
|
|
if start < 0:
|
|
result.append(text[i:])
|
|
break
|
|
|
|
result.append(text[i:start])
|
|
try:
|
|
parsed, offset = decoder.raw_decode(text[start:])
|
|
except ValueError:
|
|
if "step_completed_successfully" in text[start:]:
|
|
break
|
|
result.append(text[start])
|
|
i = start + 1
|
|
continue
|
|
|
|
end = start + offset
|
|
if self._is_step_observation_payload(parsed):
|
|
i = end
|
|
continue
|
|
|
|
result.append(text[start:end])
|
|
i = end
|
|
|
|
return "".join(result).strip()
|
|
|
|
@staticmethod
|
|
def _is_step_observation_payload(payload: Any) -> bool:
|
|
return (
|
|
isinstance(payload, dict)
|
|
and "step_completed_successfully" in payload
|
|
and "key_information_learned" in payload
|
|
)
|
|
|
|
# ── Event helpers ───────────────────────────────────────
|
|
|
|
def _complete_step(self, style: str, message: str, detail: str = "") -> None:
|
|
with self._lock:
|
|
if self._current_step:
|
|
prev_style, prev_msg, prev_detail = self._current_step
|
|
skip = prev_msg in (
|
|
"Thinking…",
|
|
"Generating response…",
|
|
) or prev_msg.startswith("⚡")
|
|
if not skip:
|
|
self._timeline.append((prev_style, prev_msg, prev_detail))
|
|
self._current_step = (style, message, detail)
|
|
if len(self._timeline) > 20:
|
|
self._timeline = self._timeline[-20:]
|
|
|
|
def _replace_step(self, style: str, message: str, detail: str = "") -> None:
|
|
"""Replace current step in-place (no archive). Used for tool results."""
|
|
with self._lock:
|
|
self._current_step = (style, message, detail)
|
|
|
|
def _set_step(self, style: str, message: str) -> None:
|
|
with self._lock:
|
|
self._current_step = (style, message, "")
|
|
|
|
# ── Plan detection ──────────────────────────────────────
|
|
|
|
def _try_parse_plan(self, text: str) -> None:
|
|
stripped = text.strip()
|
|
start = stripped.find('{"plan"')
|
|
if start < 0:
|
|
return
|
|
depth = 0
|
|
for i in range(start, len(stripped)):
|
|
if stripped[i] == "{":
|
|
depth += 1
|
|
elif stripped[i] == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
try:
|
|
data = _json.loads(stripped[start : i + 1])
|
|
if "steps" in data and isinstance(data["steps"], list):
|
|
self._plan = data
|
|
self._plan_step_status = {
|
|
s["step_number"]: "pending"
|
|
for s in data["steps"]
|
|
if "step_number" in s
|
|
}
|
|
self._awaiting_replan = False
|
|
except (ValueError, KeyError):
|
|
# Best-effort parse of streamed planner output:
|
|
# partial or non-plan JSON is expected and ignored.
|
|
pass
|
|
return
|
|
|
|
def _set_plan_step_status(self, step_number: int, status: str) -> None:
|
|
"""Set status for an explicit plan step reported by the planner."""
|
|
if not self._plan or step_number not in self._plan_step_status:
|
|
return
|
|
|
|
self._plan_step_status[step_number] = status
|
|
|
|
def _mark_plan_goal_achieved(self, step_number: int | None = None) -> None:
|
|
"""Collapse early-goal/skipped plan steps into completed UI state."""
|
|
if not self._plan:
|
|
return
|
|
|
|
if step_number is not None:
|
|
self._set_plan_step_status(step_number, "done")
|
|
|
|
for sn, current in list(self._plan_step_status.items()):
|
|
if current in ("pending", "active"):
|
|
self._plan_step_status[sn] = "done"
|
|
|
|
def _collapse_plan_on_task_done(self) -> None:
|
|
"""Collapse unfinished display-only plan steps once the task succeeds."""
|
|
if not self._plan:
|
|
return
|
|
|
|
for sn, current in list(self._plan_step_status.items()):
|
|
if current in ("pending", "active"):
|
|
self._plan_step_status[sn] = "done"
|
|
|
|
def _pop_task_state(self, event: Any) -> dict[str, Any]:
|
|
"""Return the start-time state for a completion/failure event's task.
|
|
|
|
Tasks can run async/overlapping, so the event's task identity is
|
|
matched against the state registered when the task started rather
|
|
than assuming the most recently started task. Falls back to the
|
|
current shared state for unmatched events. Caller must hold
|
|
``self._lock``.
|
|
"""
|
|
task = getattr(event, "task", None)
|
|
candidates: list[str] = []
|
|
if task is not None:
|
|
task_id = str(getattr(task, "id", "") or "")
|
|
if task_id:
|
|
candidates.append(task_id)
|
|
desc = getattr(task, "name", "") or getattr(task, "description", "") or ""
|
|
if desc:
|
|
candidates.append(desc)
|
|
event_task_name = getattr(event, "task_name", "") or ""
|
|
if event_task_name:
|
|
candidates.append(event_task_name)
|
|
for key in candidates:
|
|
state = self._task_state_by_key.pop(key, None)
|
|
if state is not None:
|
|
return state
|
|
return {
|
|
"idx": self._current_task_idx,
|
|
"desc": self._current_task_desc,
|
|
"agent": self._current_agent,
|
|
"start_time": self._task_start_time,
|
|
}
|
|
|
|
def _prepare_for_replan(self) -> None:
|
|
"""Keep current statuses visible while allowing the next plan to replace it."""
|
|
self._awaiting_replan = True
|
|
|
|
def _apply_plan_refinements(self, refinements: list[str] | None) -> None:
|
|
"""Apply refined descriptions while leaving statuses as pending/done/failed."""
|
|
if not self._plan or not refinements:
|
|
return
|
|
|
|
steps = self._plan.get("steps")
|
|
if not isinstance(steps, list):
|
|
return
|
|
|
|
steps_by_number = {
|
|
step.get("step_number"): step for step in steps if isinstance(step, dict)
|
|
}
|
|
for refinement in refinements:
|
|
match = _REFINEMENT_RE.match(refinement)
|
|
if not match:
|
|
continue
|
|
step_number = int(match.group(1))
|
|
description = match.group(2).strip()
|
|
step = steps_by_number.get(step_number)
|
|
if step is not None and description:
|
|
step["description"] = description
|
|
|
|
def _try_parse_step_observation(self, text: str) -> bool:
|
|
"""Parse streamed observation JSON and update the exact step it names."""
|
|
if "step_completed_successfully" not in text:
|
|
return False
|
|
|
|
decoder = _json.JSONDecoder()
|
|
updated = False
|
|
i = 0
|
|
while i < len(text):
|
|
start = text.find("{", i)
|
|
if start < 0:
|
|
break
|
|
try:
|
|
payload, offset = decoder.raw_decode(text[start:])
|
|
except ValueError:
|
|
i = start + 1
|
|
continue
|
|
|
|
if self._is_step_observation_payload(payload):
|
|
step_number = self._observation_step_number(payload)
|
|
if step_number is not None:
|
|
status = (
|
|
"done"
|
|
if payload.get("step_completed_successfully") is True
|
|
else "failed"
|
|
)
|
|
self._set_plan_step_status(step_number, status)
|
|
if payload.get("goal_already_achieved") is True:
|
|
self._mark_plan_goal_achieved(step_number)
|
|
updated = True
|
|
i = start + max(offset, 1)
|
|
|
|
return updated
|
|
|
|
def _observation_step_number(self, payload: dict[str, Any]) -> int | None:
|
|
raw_step_number = payload.get("step_number")
|
|
if isinstance(raw_step_number, int):
|
|
return raw_step_number
|
|
|
|
searchable = " ".join(
|
|
str(payload.get(field) or "")
|
|
for field in ("key_information_learned", "replan_reason")
|
|
)
|
|
match = _STEP_NUMBER_RE.search(searchable)
|
|
if not match:
|
|
return None
|
|
|
|
return int(match.group(1))
|
|
|
|
# ── Event subscription ──────────────────────────────────
|
|
|
|
def _register_handler(self, event_type: type, handler: Any) -> None:
|
|
self._event_handlers.append((event_type, handler))
|
|
|
|
def _unsubscribe(self) -> None:
|
|
if not self._event_handlers:
|
|
return
|
|
try:
|
|
from crewai.events.event_bus import crewai_event_bus
|
|
|
|
for event_type, handler in self._event_handlers:
|
|
crewai_event_bus.off(event_type, handler)
|
|
except Exception: # noqa: S110
|
|
pass
|
|
self._event_handlers.clear()
|
|
|
|
def _has_running_memory_save_locked(self) -> bool:
|
|
return any(
|
|
entry["tool_name"] == "memory_save" and entry["status"] == "running"
|
|
for entry in self._log_entries
|
|
)
|
|
|
|
def _on_memory_save_drain_elapsed(self) -> None:
|
|
self._memory_save_drain_timer = None
|
|
self._unsubscribe_if_no_running_memory_save()
|
|
|
|
def _schedule_memory_save_drain_unsubscribe(self) -> bool:
|
|
loop = getattr(self, "_loop", None)
|
|
if loop is None:
|
|
return False
|
|
if getattr(self, "_thread_id", None) != threading.get_ident():
|
|
try:
|
|
loop.call_soon_threadsafe(self._schedule_memory_save_drain_unsubscribe)
|
|
except RuntimeError:
|
|
return False
|
|
return True
|
|
if self._memory_save_drain_timer is not None:
|
|
self._memory_save_drain_timer.stop()
|
|
self._memory_save_drain_timer = self.set_timer(
|
|
_MEMORY_SAVE_DRAIN_GRACE_SECONDS,
|
|
self._on_memory_save_drain_elapsed,
|
|
name="memory-save-drain",
|
|
)
|
|
return True
|
|
|
|
def _unsubscribe_if_no_running_memory_save(
|
|
self, *, wait_for_queued: bool = False
|
|
) -> None:
|
|
with self._lock:
|
|
should_unsubscribe = (
|
|
self._status
|
|
in {
|
|
"completed",
|
|
"failed",
|
|
}
|
|
and not self._has_running_memory_save_locked()
|
|
)
|
|
|
|
if should_unsubscribe:
|
|
if wait_for_queued and self._schedule_memory_save_drain_unsubscribe():
|
|
return
|
|
self._unsubscribe()
|
|
|
|
def _subscribe(self) -> None:
|
|
from crewai.events.event_bus import crewai_event_bus
|
|
from crewai.events.types.crew_events import CrewKickoffStartedEvent
|
|
from crewai.events.types.llm_events import (
|
|
LLMCallCompletedEvent,
|
|
LLMCallStartedEvent,
|
|
LLMStreamChunkEvent,
|
|
)
|
|
from crewai.events.types.logging_events import (
|
|
AgentLogsExecutionEvent,
|
|
AgentLogsStartedEvent,
|
|
)
|
|
from crewai.events.types.observation_events import (
|
|
GoalAchievedEarlyEvent,
|
|
PlanRefinementEvent,
|
|
PlanReplanTriggeredEvent,
|
|
PlanStepCompletedEvent,
|
|
PlanStepStartedEvent,
|
|
StepObservationCompletedEvent,
|
|
StepObservationFailedEvent,
|
|
StepObservationStartedEvent,
|
|
)
|
|
from crewai.events.types.task_events import (
|
|
TaskCompletedEvent,
|
|
TaskFailedEvent,
|
|
TaskStartedEvent,
|
|
)
|
|
from crewai.events.types.tool_usage_events import (
|
|
ToolUsageErrorEvent,
|
|
ToolUsageFinishedEvent,
|
|
ToolUsageStartedEvent,
|
|
)
|
|
|
|
@crewai_event_bus.on(CrewKickoffStartedEvent)
|
|
def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None:
|
|
with self._lock:
|
|
if event.crew_name:
|
|
self._crew_name = event.crew_name
|
|
self.title = f"CrewAI — {event.crew_name}"
|
|
self._status = "working"
|
|
|
|
self._register_handler(CrewKickoffStartedEvent, on_crew_started)
|
|
|
|
@crewai_event_bus.on(TaskStartedEvent)
|
|
def on_task_started(source: Any, event: TaskStartedEvent) -> None:
|
|
with self._lock:
|
|
self._current_task_idx += 1
|
|
idx = self._current_task_idx
|
|
self._task_start_time = time.time()
|
|
self._streaming_text = ""
|
|
self._task_full_output = ""
|
|
self._is_streaming = False
|
|
self._plan = None
|
|
self._plan_step_status = {}
|
|
self._awaiting_replan = False
|
|
|
|
# Tasks may run async/overlapping, so earlier active rows are
|
|
# only marked done by their own completion events (with a
|
|
# final sweep in _on_crew_done).
|
|
if idx in self._task_statuses:
|
|
self._task_statuses[idx] = "active"
|
|
|
|
desc = ""
|
|
if event.task:
|
|
desc = getattr(event.task, "name", "") or ""
|
|
if not desc:
|
|
desc = getattr(event.task, "description", "") or ""
|
|
if not desc and event.task_name:
|
|
desc = event.task_name
|
|
self._current_task_desc = desc
|
|
|
|
agent = getattr(source, "agent", None) if source else None
|
|
agent_role = (getattr(agent, "role", "") or "") if agent else ""
|
|
if agent_role:
|
|
self._current_agent = agent_role
|
|
|
|
key = str(getattr(event.task, "id", "") or "") or desc
|
|
if key:
|
|
self._task_state_by_key[key] = {
|
|
"idx": idx,
|
|
"desc": desc,
|
|
"agent": agent_role,
|
|
"start_time": self._task_start_time,
|
|
}
|
|
|
|
self._timeline = []
|
|
self._current_step = None
|
|
self._set_step("yellow", "Thinking…")
|
|
|
|
self._register_handler(TaskStartedEvent, on_task_started)
|
|
|
|
@crewai_event_bus.on(AgentLogsStartedEvent)
|
|
def on_agent_started(source: Any, event: AgentLogsStartedEvent) -> None:
|
|
with self._lock:
|
|
role = event.agent_role.split("\n")[0] if event.agent_role else ""
|
|
if role:
|
|
self._current_agent = role
|
|
|
|
self._register_handler(AgentLogsStartedEvent, on_agent_started)
|
|
|
|
@crewai_event_bus.on(LLMCallStartedEvent)
|
|
def on_llm_started(source: Any, event: LLMCallStartedEvent) -> None:
|
|
with self._lock:
|
|
self._is_streaming = False
|
|
self._streaming_text = ""
|
|
self._live_out_tokens = 0
|
|
self._current_llm_text = ""
|
|
if event.messages:
|
|
estimate = len(str(event.messages)) // 4
|
|
self._input_tokens += estimate
|
|
self._pending_input_estimate = estimate
|
|
self._complete_step("yellow", "Thinking…")
|
|
|
|
self._register_handler(LLMCallStartedEvent, on_llm_started)
|
|
|
|
@crewai_event_bus.on(LLMCallCompletedEvent)
|
|
def on_llm_completed(source: Any, event: LLMCallCompletedEvent) -> None:
|
|
with self._lock:
|
|
self._llm_calls += 1
|
|
self._is_streaming = False
|
|
self._streaming_text = ""
|
|
self._live_out_tokens = 0
|
|
self._input_tokens -= self._pending_input_estimate
|
|
self._pending_input_estimate = 0
|
|
if event.usage:
|
|
u = event.usage
|
|
inp = next(
|
|
(
|
|
u[k]
|
|
for k in (
|
|
"prompt_tokens",
|
|
"input_tokens",
|
|
"prompt_token_count",
|
|
)
|
|
if u.get(k)
|
|
),
|
|
0,
|
|
)
|
|
out = next(
|
|
(
|
|
u[k]
|
|
for k in (
|
|
"completion_tokens",
|
|
"output_tokens",
|
|
"candidates_token_count",
|
|
)
|
|
if u.get(k)
|
|
),
|
|
0,
|
|
)
|
|
self._input_tokens += inp
|
|
self._output_tokens += out
|
|
if self._current_llm_text.strip():
|
|
self._current_task_steps.append(
|
|
{
|
|
"type": "llm",
|
|
"summary": f"LLM response (call {self._llm_calls})",
|
|
"detail": self._current_llm_text.strip(),
|
|
"style": "dim",
|
|
}
|
|
)
|
|
self._current_llm_text = ""
|
|
|
|
self._register_handler(LLMCallCompletedEvent, on_llm_completed)
|
|
|
|
@crewai_event_bus.on(LLMStreamChunkEvent)
|
|
def on_stream_chunk(source: Any, event: LLMStreamChunkEvent) -> None:
|
|
with self._lock:
|
|
if not self._is_streaming:
|
|
self._current_step = ("yellow", "Generating response…", "")
|
|
self._is_streaming = True
|
|
self._streaming_text += event.chunk
|
|
self._task_full_output += event.chunk
|
|
self._current_llm_text += event.chunk
|
|
self._live_out_tokens += 1
|
|
if (
|
|
not self._plan or self._awaiting_replan
|
|
) and '{"plan"' in self._streaming_text:
|
|
self._try_parse_plan(self._streaming_text)
|
|
if self._plan and "step_completed_successfully" in self._streaming_text:
|
|
self._try_parse_step_observation(self._streaming_text)
|
|
|
|
self._register_handler(LLMStreamChunkEvent, on_stream_chunk)
|
|
|
|
@crewai_event_bus.on(StepObservationStartedEvent)
|
|
def on_step_observation_started(
|
|
source: Any, event: StepObservationStartedEvent
|
|
) -> None:
|
|
with self._lock:
|
|
self._set_plan_step_status(event.step_number, "active")
|
|
|
|
self._register_handler(StepObservationStartedEvent, on_step_observation_started)
|
|
|
|
@crewai_event_bus.on(StepObservationCompletedEvent)
|
|
def on_step_observation_completed(
|
|
source: Any, event: StepObservationCompletedEvent
|
|
) -> None:
|
|
with self._lock:
|
|
status = "done" if event.step_completed_successfully else "failed"
|
|
self._set_plan_step_status(event.step_number, status)
|
|
|
|
self._register_handler(
|
|
StepObservationCompletedEvent, on_step_observation_completed
|
|
)
|
|
|
|
@crewai_event_bus.on(StepObservationFailedEvent)
|
|
def on_step_observation_failed(
|
|
source: Any, event: StepObservationFailedEvent
|
|
) -> None:
|
|
with self._lock:
|
|
# Intentionally "done", not "failed": this event means the
|
|
# step OBSERVER failed (e.g. timeout), not the step itself,
|
|
# and the executor continues past it. A red ✘ would wrongly
|
|
# suggest the plan step failed.
|
|
self._set_plan_step_status(event.step_number, "done")
|
|
|
|
self._register_handler(StepObservationFailedEvent, on_step_observation_failed)
|
|
|
|
@crewai_event_bus.on(PlanRefinementEvent)
|
|
def on_plan_refinement(source: Any, event: PlanRefinementEvent) -> None:
|
|
with self._lock:
|
|
if event.step_number:
|
|
self._set_plan_step_status(event.step_number, "done")
|
|
self._apply_plan_refinements(event.refinements)
|
|
|
|
self._register_handler(PlanRefinementEvent, on_plan_refinement)
|
|
|
|
@crewai_event_bus.on(PlanStepStartedEvent)
|
|
def on_plan_step_started(source: Any, event: PlanStepStartedEvent) -> None:
|
|
with self._lock:
|
|
self._set_plan_step_status(event.step_number, "active")
|
|
|
|
self._register_handler(PlanStepStartedEvent, on_plan_step_started)
|
|
|
|
@crewai_event_bus.on(PlanStepCompletedEvent)
|
|
def on_plan_step_completed(source: Any, event: PlanStepCompletedEvent) -> None:
|
|
with self._lock:
|
|
self._set_plan_step_status(
|
|
event.step_number,
|
|
"done" if event.success else "failed",
|
|
)
|
|
|
|
self._register_handler(PlanStepCompletedEvent, on_plan_step_completed)
|
|
|
|
@crewai_event_bus.on(PlanReplanTriggeredEvent)
|
|
def on_plan_replan_triggered(
|
|
source: Any, event: PlanReplanTriggeredEvent
|
|
) -> None:
|
|
with self._lock:
|
|
self._prepare_for_replan()
|
|
self._current_step = ("yellow", "Replanning…", event.replan_reason)
|
|
|
|
self._register_handler(PlanReplanTriggeredEvent, on_plan_replan_triggered)
|
|
|
|
@crewai_event_bus.on(GoalAchievedEarlyEvent)
|
|
def on_goal_achieved_early(source: Any, event: GoalAchievedEarlyEvent) -> None:
|
|
with self._lock:
|
|
self._mark_plan_goal_achieved(event.step_number or None)
|
|
|
|
self._register_handler(GoalAchievedEarlyEvent, on_goal_achieved_early)
|
|
|
|
@crewai_event_bus.on(ToolUsageStartedEvent)
|
|
def on_tool_started(source: Any, event: ToolUsageStartedEvent) -> None:
|
|
if event.tool_name in _INTERNAL_TOOL_NAMES:
|
|
return
|
|
|
|
with self._lock:
|
|
self._is_streaming = False
|
|
self._streaming_text = ""
|
|
now = time.time()
|
|
args_str = ""
|
|
if event.tool_args:
|
|
try:
|
|
args_str = _json.dumps(event.tool_args, indent=2, default=str)
|
|
except Exception:
|
|
args_str = str(event.tool_args)
|
|
for entry in self._log_entries:
|
|
if (
|
|
entry["status"] == "running"
|
|
and entry["tool_name"] == event.tool_name
|
|
and entry["args"] == (args_str or None)
|
|
):
|
|
return
|
|
for entry in self._log_entries:
|
|
if (
|
|
entry["status"] == "running"
|
|
and entry["tool_name"] != event.tool_name
|
|
):
|
|
if entry["tool_name"] == "memory_save":
|
|
continue
|
|
entry["status"] = "timeout"
|
|
entry["error"] = (
|
|
"No result received before the next tool started"
|
|
)
|
|
entry["duration"] = now - entry["start_time"]
|
|
plan_step_number = getattr(event, "plan_step_number", None)
|
|
if not isinstance(plan_step_number, int):
|
|
plan_step_number = None
|
|
self._current_task_steps.append(
|
|
{
|
|
"type": "tool",
|
|
"summary": f"Using {event.tool_name}…",
|
|
"detail": f"Args:\n{args_str}" if args_str else None,
|
|
"style": "yellow",
|
|
"_tool_name": event.tool_name,
|
|
}
|
|
)
|
|
self._log_entries.append(
|
|
{
|
|
"tool_name": event.tool_name,
|
|
"status": "running",
|
|
"args": args_str or None,
|
|
"result": None,
|
|
"error": None,
|
|
"start_time": time.time(),
|
|
"duration": None,
|
|
"task_idx": self._current_task_idx,
|
|
"plan_step_number": plan_step_number,
|
|
"event_id": event.event_id,
|
|
}
|
|
)
|
|
self._complete_step("teal", f"⚡ {event.tool_name}…")
|
|
|
|
self._register_handler(ToolUsageStartedEvent, on_tool_started)
|
|
|
|
@crewai_event_bus.on(ToolUsageFinishedEvent)
|
|
def on_tool_finished(source: Any, event: ToolUsageFinishedEvent) -> None:
|
|
if event.tool_name in _INTERNAL_TOOL_NAMES:
|
|
return
|
|
|
|
with self._lock:
|
|
if event.output is not None:
|
|
out = event.output
|
|
if isinstance(out, (dict, list)):
|
|
try:
|
|
result_str = _json.dumps(out, indent=2, ensure_ascii=False)[
|
|
:5000
|
|
]
|
|
except (TypeError, ValueError):
|
|
result_str = str(out)[:5000]
|
|
else:
|
|
result_str = str(out)[:5000]
|
|
else:
|
|
result_str = "No output"
|
|
for step in reversed(self._current_task_steps):
|
|
if (
|
|
step.get("_tool_name") == event.tool_name
|
|
and step["type"] == "tool"
|
|
):
|
|
existing = step.get("detail") or ""
|
|
step["detail"] = (
|
|
f"{existing}\n\nResult:\n{result_str}"
|
|
if existing
|
|
else f"Result:\n{result_str}"
|
|
)
|
|
step["summary"] = f"✔ {event.tool_name}"
|
|
step["style"] = "green"
|
|
break
|
|
from_cache = getattr(event, "from_cache", False)
|
|
for entry in reversed(self._log_entries):
|
|
if entry["tool_name"] == event.tool_name and (
|
|
entry["status"] == "running"
|
|
or (entry["status"] == "success" and entry["result"] is None)
|
|
):
|
|
entry["status"] = "success"
|
|
entry["result"] = result_str
|
|
entry["duration"] = time.time() - entry["start_time"]
|
|
entry["from_cache"] = from_cache
|
|
break
|
|
self._replace_step("green", f"✔ {event.tool_name}")
|
|
|
|
self._register_handler(ToolUsageFinishedEvent, on_tool_finished)
|
|
|
|
@crewai_event_bus.on(ToolUsageErrorEvent)
|
|
def on_tool_error(source: Any, event: ToolUsageErrorEvent) -> None:
|
|
if event.tool_name in _INTERNAL_TOOL_NAMES:
|
|
return
|
|
|
|
error_text = str(event.error)[:200] if event.error else ""
|
|
with self._lock:
|
|
for step in reversed(self._current_task_steps):
|
|
if (
|
|
step.get("_tool_name") == event.tool_name
|
|
and step["type"] == "tool"
|
|
):
|
|
existing = step.get("detail") or ""
|
|
step["detail"] = (
|
|
f"{existing}\n\nError:\n{event.error}"
|
|
if existing
|
|
else f"Error:\n{event.error}"
|
|
)
|
|
step["summary"] = f"✘ {event.tool_name}"
|
|
step["style"] = "red"
|
|
break
|
|
for idx, entry in reversed(list(enumerate(self._log_entries))):
|
|
if entry["tool_name"] == event.tool_name and (
|
|
entry["status"] == "running"
|
|
or (entry["status"] == "success" and entry["result"] is None)
|
|
):
|
|
entry["status"] = "error"
|
|
entry["error"] = str(event.error) if event.error else None
|
|
entry["duration"] = time.time() - entry["start_time"]
|
|
self._log_expanded.add(idx)
|
|
break
|
|
self._replace_step("red", f"✘ {event.tool_name}", error_text)
|
|
|
|
self._register_handler(ToolUsageErrorEvent, on_tool_error)
|
|
|
|
from crewai.events.types.memory_events import (
|
|
MemoryRetrievalCompletedEvent,
|
|
MemoryRetrievalFailedEvent,
|
|
MemoryRetrievalStartedEvent,
|
|
MemorySaveCompletedEvent,
|
|
MemorySaveFailedEvent,
|
|
MemorySaveStartedEvent,
|
|
)
|
|
|
|
def is_nested_save_to_memory_event(event: Any) -> bool:
|
|
if event.parent_event_id is None:
|
|
return False
|
|
state = crewai_event_bus.runtime_state
|
|
if state is None:
|
|
return False
|
|
parent_node = state.event_record.nodes.get(event.parent_event_id)
|
|
parent_event = getattr(parent_node, "event", None)
|
|
return getattr(
|
|
parent_event, "type", None
|
|
) == "tool_usage_started" and _is_save_to_memory_tool(
|
|
getattr(parent_event, "tool_name", None)
|
|
)
|
|
|
|
@crewai_event_bus.on(MemorySaveStartedEvent)
|
|
def on_memory_save_started(source: Any, event: MemorySaveStartedEvent) -> None:
|
|
with self._lock:
|
|
if is_nested_save_to_memory_event(event):
|
|
self._suppressed_memory_save_event_ids.add(event.event_id)
|
|
return
|
|
for entry in reversed(self._log_entries):
|
|
if (
|
|
_is_save_to_memory_tool(entry["tool_name"])
|
|
and entry.get("event_id") == event.parent_event_id
|
|
):
|
|
self._suppressed_memory_save_event_ids.add(event.event_id)
|
|
return
|
|
for entry in reversed(self._log_entries):
|
|
if (
|
|
entry["tool_name"] == "memory_save"
|
|
and entry.get("started_event_id") == event.event_id
|
|
):
|
|
entry["args"] = _truncate_log_text(
|
|
event.value, _LOG_ARGS_TEXT_LIMIT
|
|
)
|
|
return
|
|
self._log_entries.append(
|
|
{
|
|
"tool_name": "memory_save",
|
|
"status": "running",
|
|
"args": _truncate_log_text(event.value, _LOG_ARGS_TEXT_LIMIT),
|
|
"result": None,
|
|
"error": None,
|
|
"start_time": time.time(),
|
|
"duration": None,
|
|
"task_idx": self._current_task_idx,
|
|
"event_id": event.event_id,
|
|
}
|
|
)
|
|
|
|
self._register_handler(MemorySaveStartedEvent, on_memory_save_started)
|
|
|
|
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
|
def on_memory_save_completed(
|
|
source: Any, event: MemorySaveCompletedEvent
|
|
) -> None:
|
|
with self._lock:
|
|
if (
|
|
event.started_event_id in self._suppressed_memory_save_event_ids
|
|
or is_nested_save_to_memory_event(event)
|
|
):
|
|
if event.started_event_id is not None:
|
|
self._suppressed_memory_save_event_ids.discard(
|
|
event.started_event_id
|
|
)
|
|
else:
|
|
for entry in reversed(self._log_entries):
|
|
has_started_event_match = (
|
|
event.started_event_id is not None
|
|
and (
|
|
entry.get("event_id") == event.started_event_id
|
|
or entry.get("started_event_id")
|
|
== event.started_event_id
|
|
)
|
|
)
|
|
has_running_event_without_id = (
|
|
event.started_event_id is None
|
|
and entry["status"] == "running"
|
|
)
|
|
if entry["tool_name"] == "memory_save" and (
|
|
has_running_event_without_id or has_started_event_match
|
|
):
|
|
entry["status"] = "success"
|
|
entry["duration"] = event.save_time_ms / 1000
|
|
entry["result"] = _truncate_log_text(
|
|
event.value, _LOG_RESULT_TEXT_LIMIT
|
|
)
|
|
entry["error"] = None
|
|
entry["started_event_id"] = event.started_event_id
|
|
break
|
|
else:
|
|
self._log_entries.append(
|
|
{
|
|
"tool_name": "memory_save",
|
|
"status": "success",
|
|
"args": None,
|
|
"result": _truncate_log_text(
|
|
event.value, _LOG_RESULT_TEXT_LIMIT
|
|
),
|
|
"error": None,
|
|
"start_time": time.time(),
|
|
"duration": event.save_time_ms / 1000,
|
|
"task_idx": self._current_task_idx,
|
|
"started_event_id": event.started_event_id,
|
|
}
|
|
)
|
|
|
|
self._unsubscribe_if_no_running_memory_save(wait_for_queued=True)
|
|
|
|
self._register_handler(MemorySaveCompletedEvent, on_memory_save_completed)
|
|
|
|
@crewai_event_bus.on(MemorySaveFailedEvent)
|
|
def on_memory_save_failed(source: Any, event: MemorySaveFailedEvent) -> None:
|
|
with self._lock:
|
|
if (
|
|
event.started_event_id in self._suppressed_memory_save_event_ids
|
|
or is_nested_save_to_memory_event(event)
|
|
):
|
|
if event.started_event_id is not None:
|
|
self._suppressed_memory_save_event_ids.discard(
|
|
event.started_event_id
|
|
)
|
|
else:
|
|
for idx, entry in reversed(list(enumerate(self._log_entries))):
|
|
has_started_event_match = (
|
|
event.started_event_id is not None
|
|
and (
|
|
entry.get("event_id") == event.started_event_id
|
|
or entry.get("started_event_id")
|
|
== event.started_event_id
|
|
)
|
|
)
|
|
has_running_event_without_id = (
|
|
event.started_event_id is None
|
|
and entry["status"] == "running"
|
|
)
|
|
if entry["tool_name"] == "memory_save" and (
|
|
has_running_event_without_id or has_started_event_match
|
|
):
|
|
entry["status"] = "error"
|
|
entry["error"] = event.error
|
|
entry["duration"] = time.time() - entry["start_time"]
|
|
entry["started_event_id"] = event.started_event_id
|
|
self._log_expanded.add(idx)
|
|
break
|
|
else:
|
|
self._log_entries.append(
|
|
{
|
|
"tool_name": "memory_save",
|
|
"status": "error",
|
|
"args": _truncate_log_text(
|
|
event.value, _LOG_ARGS_TEXT_LIMIT
|
|
),
|
|
"result": None,
|
|
"error": event.error,
|
|
"start_time": time.time(),
|
|
"duration": 0,
|
|
"task_idx": self._current_task_idx,
|
|
"started_event_id": event.started_event_id,
|
|
}
|
|
)
|
|
self._log_expanded.add(len(self._log_entries) - 1)
|
|
|
|
self._unsubscribe_if_no_running_memory_save(wait_for_queued=True)
|
|
|
|
self._register_handler(MemorySaveFailedEvent, on_memory_save_failed)
|
|
|
|
@crewai_event_bus.on(MemoryRetrievalStartedEvent)
|
|
def on_memory_retrieval_started(
|
|
source: Any, event: MemoryRetrievalStartedEvent
|
|
) -> None:
|
|
with self._lock:
|
|
self._log_entries.append(
|
|
{
|
|
"tool_name": "memory_recall",
|
|
"status": "running",
|
|
"args": None,
|
|
"result": None,
|
|
"error": None,
|
|
"start_time": time.time(),
|
|
"duration": None,
|
|
"task_idx": self._current_task_idx,
|
|
}
|
|
)
|
|
|
|
self._register_handler(MemoryRetrievalStartedEvent, on_memory_retrieval_started)
|
|
|
|
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
|
|
def on_memory_retrieval_completed(
|
|
source: Any, event: MemoryRetrievalCompletedEvent
|
|
) -> None:
|
|
with self._lock:
|
|
for entry in reversed(self._log_entries):
|
|
if (
|
|
entry["tool_name"] == "memory_recall"
|
|
and entry["status"] == "running"
|
|
):
|
|
entry["status"] = "success"
|
|
entry["duration"] = event.retrieval_time_ms / 1000
|
|
content = event.memory_content or ""
|
|
if content:
|
|
entry["result"] = content[:3000]
|
|
break
|
|
|
|
self._register_handler(
|
|
MemoryRetrievalCompletedEvent, on_memory_retrieval_completed
|
|
)
|
|
|
|
@crewai_event_bus.on(MemoryRetrievalFailedEvent)
|
|
def on_memory_retrieval_failed(
|
|
source: Any, event: MemoryRetrievalFailedEvent
|
|
) -> None:
|
|
with self._lock:
|
|
for idx, entry in enumerate(self._log_entries):
|
|
if (
|
|
entry["tool_name"] == "memory_recall"
|
|
and entry["status"] == "running"
|
|
):
|
|
entry["status"] = "error"
|
|
entry["error"] = event.error
|
|
entry["duration"] = 0
|
|
self._log_expanded.add(idx)
|
|
break
|
|
|
|
self._register_handler(MemoryRetrievalFailedEvent, on_memory_retrieval_failed)
|
|
|
|
@crewai_event_bus.on(AgentLogsExecutionEvent)
|
|
def on_agent_execution(source: Any, event: AgentLogsExecutionEvent) -> None:
|
|
from crewai.agents.parser import AgentAction, AgentFinish
|
|
|
|
if isinstance(event.formatted_answer, AgentAction):
|
|
self._complete_step("cyan", f"→ {event.formatted_answer.tool}")
|
|
elif isinstance(event.formatted_answer, AgentFinish):
|
|
self._complete_step("green", "✔ Agent finished")
|
|
|
|
self._register_handler(AgentLogsExecutionEvent, on_agent_execution)
|
|
|
|
@crewai_event_bus.on(TaskCompletedEvent)
|
|
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
|
|
now = time.time()
|
|
with self._lock:
|
|
state = self._pop_task_state(event)
|
|
idx = state["idx"]
|
|
self._task_statuses[idx] = "done"
|
|
elapsed = now - state["start_time"]
|
|
# The shared stream fields (steps, timeline, streamed output)
|
|
# belong to the most recently started task. Only consume and
|
|
# reset them when that is the task completing — an earlier
|
|
# task finishing out of order must not steal or clear the
|
|
# current task's stream.
|
|
is_current = idx == self._current_task_idx
|
|
output = getattr(event.output, "raw", "") or ""
|
|
|
|
if is_current:
|
|
self._collapse_plan_on_task_done()
|
|
|
|
if self._current_llm_text.strip():
|
|
self._current_task_steps.append(
|
|
{
|
|
"type": "llm",
|
|
"summary": "Final response",
|
|
"detail": self._current_llm_text.strip(),
|
|
"style": "green",
|
|
}
|
|
)
|
|
self._current_llm_text = ""
|
|
|
|
steps = list(self._current_task_steps)
|
|
self._current_task_steps = []
|
|
timeline = list(self._timeline)
|
|
output = self._task_full_output or output
|
|
|
|
self._is_streaming = False
|
|
self._streaming_text = ""
|
|
self._task_full_output = ""
|
|
self._timeline = []
|
|
self._current_step = None
|
|
else:
|
|
steps = []
|
|
timeline = []
|
|
|
|
self._task_logs.append(
|
|
{
|
|
"idx": idx,
|
|
"desc": state["desc"] or "Task",
|
|
"agent": state["agent"],
|
|
"elapsed": elapsed,
|
|
"timeline": timeline,
|
|
"steps": steps,
|
|
"output": output,
|
|
}
|
|
)
|
|
|
|
self._register_handler(TaskCompletedEvent, on_task_completed)
|
|
|
|
@crewai_event_bus.on(TaskFailedEvent)
|
|
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
|
|
now = time.time()
|
|
with self._lock:
|
|
state = self._pop_task_state(event)
|
|
idx = state["idx"]
|
|
self._task_statuses[idx] = "failed"
|
|
is_current = idx == self._current_task_idx
|
|
|
|
error_step = {
|
|
"type": "error",
|
|
"summary": f"✘ Failed: {event.error[:100]}",
|
|
"detail": event.error,
|
|
"style": "red",
|
|
}
|
|
if is_current:
|
|
self._current_task_steps.append(error_step)
|
|
steps = list(self._current_task_steps)
|
|
self._current_task_steps = []
|
|
timeline = list(self._timeline)
|
|
output = self._task_full_output
|
|
else:
|
|
steps = [error_step]
|
|
timeline = []
|
|
output = ""
|
|
|
|
self._task_logs.append(
|
|
{
|
|
"idx": idx,
|
|
"desc": state["desc"] or "Task",
|
|
"agent": state["agent"],
|
|
"elapsed": now - state["start_time"],
|
|
"timeline": timeline,
|
|
"steps": steps,
|
|
"output": output,
|
|
"error": event.error,
|
|
}
|
|
)
|
|
self._complete_step(
|
|
"red", f"✘ Failed: {event.error[:50]}", event.error[:200]
|
|
)
|
|
|
|
self._register_handler(TaskFailedEvent, on_task_failed)
|