diff --git a/lib/cli/src/crewai_cli/crew_run_tui.py b/lib/cli/src/crewai_cli/crew_run_tui.py index 1fc93337d..3172554a8 100644 --- a/lib/cli/src/crewai_cli/crew_run_tui.py +++ b/lib/cli/src/crewai_cli/crew_run_tui.py @@ -34,12 +34,25 @@ _C_MUTED = "#666666" # dimmer than _C_DIM for past timeline _STEP_NUMBER_RE = re.compile(r"\bstep\s+(\d+)\b", re.IGNORECASE) _REFINEMENT_RE = re.compile(r"^\s*step\s+(\d+)\s*:\s*(.+)\s*$", re.IGNORECASE) _INTERNAL_TOOL_NAMES = {"create_reasoning_plan"} +_LOG_ARGS_TEXT_LIMIT = 3_000 +_LOG_RESULT_TEXT_LIMIT = 5_000 +_LOG_TRUNCATION_SUFFIX = "... [truncated]" def _is_save_to_memory_tool(tool_name: str | None) -> bool: return (tool_name or "").replace(" ", "_").lower() == "save_to_memory" +def _truncate_log_text(value: Any, limit: int) -> str | None: + if value is None: + return None + text = str(value) + if len(text) <= limit: + return text + suffix = _LOG_TRUNCATION_SUFFIX + return f"{text[: max(0, limit - len(suffix))]}{suffix}" + + def _enable_tracing_in_dotenv() -> None: """Append CREWAI_TRACING_ENABLED=true to .env if not already set.""" from pathlib import Path @@ -702,6 +715,7 @@ FooterKey .footer-key--key { if entry["tool_name"] == "memory_save": continue entry["status"] = "error" + entry["error"] = "No result received before crew failed" entry["duration"] = now - entry["start_time"] self._tick() self.call_later(self._focus_activity_log) @@ -1811,6 +1825,8 @@ FooterKey .footer-key--key { entry["status"] == "running" and entry["tool_name"] != event.tool_name ): + if entry["tool_name"] == "memory_save": + continue entry["status"] = "timeout" entry["error"] = ( "No result received before the next tool started" @@ -1946,15 +1962,14 @@ FooterKey .footer-key--key { return False parent_node = state.event_record.nodes.get(event.parent_event_id) parent_event = getattr(parent_node, "event", None) - return ( - getattr(parent_event, "type", None) == "tool_usage_started" - and _is_save_to_memory_tool(getattr(parent_event, "tool_name", None)) + return getattr( + parent_event, "type", None + ) == "tool_usage_started" and _is_save_to_memory_tool( + getattr(parent_event, "tool_name", None) ) @crewai_event_bus.on(MemorySaveStartedEvent) - def on_memory_save_started( - source: Any, event: MemorySaveStartedEvent - ) -> None: + def on_memory_save_started(source: Any, event: MemorySaveStartedEvent) -> None: with self._lock: if is_nested_save_to_memory_event(event): self._suppressed_memory_save_event_ids.add(event.event_id) @@ -1971,13 +1986,15 @@ FooterKey .footer-key--key { entry["tool_name"] == "memory_save" and entry.get("started_event_id") == event.event_id ): - entry["args"] = event.value + entry["args"] = _truncate_log_text( + event.value, _LOG_ARGS_TEXT_LIMIT + ) return self._log_entries.append( { "tool_name": "memory_save", "status": "running", - "args": event.value, + "args": _truncate_log_text(event.value, _LOG_ARGS_TEXT_LIMIT), "result": None, "error": None, "start_time": time.time(), @@ -1998,25 +2015,25 @@ FooterKey .footer-key--key { event.started_event_id in self._suppressed_memory_save_event_ids or is_nested_save_to_memory_event(event) ): - self._suppressed_memory_save_event_ids.discard( - event.started_event_id - ) + if event.started_event_id is not None: + self._suppressed_memory_save_event_ids.discard( + event.started_event_id + ) return for entry in reversed(self._log_entries): - if ( - entry["tool_name"] == "memory_save" - and ( - ( - event.started_event_id is None - and entry["status"] == "running" - ) - or entry.get("event_id") == event.started_event_id - or entry.get("started_event_id") == event.started_event_id + if entry["tool_name"] == "memory_save" and ( + ( + event.started_event_id is None + and entry["status"] == "running" ) + or entry.get("event_id") == event.started_event_id + or entry.get("started_event_id") == event.started_event_id ): entry["status"] = "success" entry["duration"] = event.save_time_ms / 1000 - entry["result"] = event.value + entry["result"] = _truncate_log_text( + event.value, _LOG_RESULT_TEXT_LIMIT + ) entry["error"] = None entry["started_event_id"] = event.started_event_id break @@ -2026,7 +2043,9 @@ FooterKey .footer-key--key { "tool_name": "memory_save", "status": "success", "args": None, - "result": event.value, + "result": _truncate_log_text( + event.value, _LOG_RESULT_TEXT_LIMIT + ), "error": None, "start_time": time.time(), "duration": event.save_time_ms / 1000, @@ -2038,29 +2057,25 @@ FooterKey .footer-key--key { self._register_handler(MemorySaveCompletedEvent, on_memory_save_completed) @crewai_event_bus.on(MemorySaveFailedEvent) - def on_memory_save_failed( - source: Any, event: MemorySaveFailedEvent - ) -> None: + def on_memory_save_failed(source: Any, event: MemorySaveFailedEvent) -> None: with self._lock: if ( event.started_event_id in self._suppressed_memory_save_event_ids or is_nested_save_to_memory_event(event) ): - self._suppressed_memory_save_event_ids.discard( - event.started_event_id - ) + if event.started_event_id is not None: + self._suppressed_memory_save_event_ids.discard( + event.started_event_id + ) return for idx, entry in reversed(list(enumerate(self._log_entries))): - if ( - entry["tool_name"] == "memory_save" - and ( - ( - event.started_event_id is None - and entry["status"] == "running" - ) - or entry.get("event_id") == event.started_event_id - or entry.get("started_event_id") == event.started_event_id + if entry["tool_name"] == "memory_save" and ( + ( + event.started_event_id is None + and entry["status"] == "running" ) + or entry.get("event_id") == event.started_event_id + or entry.get("started_event_id") == event.started_event_id ): entry["status"] = "error" entry["error"] = event.error @@ -2073,7 +2088,9 @@ FooterKey .footer-key--key { { "tool_name": "memory_save", "status": "error", - "args": event.value, + "args": _truncate_log_text( + event.value, _LOG_ARGS_TEXT_LIMIT + ), "result": None, "error": event.error, "start_time": time.time(), diff --git a/lib/cli/tests/test_crew_run_tui.py b/lib/cli/tests/test_crew_run_tui.py index 03b03b89b..c0379795d 100644 --- a/lib/cli/tests/test_crew_run_tui.py +++ b/lib/cli/tests/test_crew_run_tui.py @@ -26,7 +26,12 @@ from crewai.events.types.tool_usage_events import ( ) from crewai_cli.command import AuthenticationRequiredError from crewai_cli import run_crew -from crewai_cli.crew_run_tui import CrewRunApp +from crewai_cli.crew_run_tui import ( + CrewRunApp, + _LOG_ARGS_TEXT_LIMIT, + _LOG_RESULT_TEXT_LIMIT, + _LOG_TRUNCATION_SUFFIX, +) def _app_with_plan() -> CrewRunApp: @@ -481,6 +486,64 @@ def test_memory_save_completion_updates_timed_out_row() -> None: assert app._log_entries[0]["duration"] == 8.3 +def test_memory_save_payloads_are_truncated_in_activity_log() -> None: + app = _app_with_plan() + long_args = "a" * (_LOG_ARGS_TEXT_LIMIT + 10) + long_result = "r" * (_LOG_RESULT_TEXT_LIMIT + 10) + + app._subscribe() + try: + _emit_event( + MemorySaveStartedEvent( + value=long_args, + metadata={}, + source_type="unified_memory", + ) + ) + _emit_event( + MemorySaveCompletedEvent( + value=long_result, + metadata={}, + save_time_ms=8300, + source_type="unified_memory", + ) + ) + finally: + app._unsubscribe() + + assert len(app._log_entries[0]["args"]) == _LOG_ARGS_TEXT_LIMIT + assert app._log_entries[0]["args"].endswith(_LOG_TRUNCATION_SUFFIX) + assert len(app._log_entries[0]["result"]) == _LOG_RESULT_TEXT_LIMIT + assert app._log_entries[0]["result"].endswith(_LOG_TRUNCATION_SUFFIX) + + +def test_starting_next_tool_does_not_timeout_memory_save() -> None: + app = _app_with_plan() + app._subscribe() + try: + _emit_event( + MemorySaveStartedEvent( + value="9 memories (background)", + metadata={}, + source_type="unified_memory", + ) + ) + _emit_event( + ToolUsageStartedEvent( + tool_name="read_website_content", + tool_args={"url": "https://example.com"}, + ) + ) + finally: + app._unsubscribe() + + assert app._log_entries[0]["tool_name"] == "memory_save" + assert app._log_entries[0]["status"] == "running" + assert app._log_entries[0]["error"] is None + assert app._log_entries[1]["tool_name"] == "read_website_content" + assert app._log_entries[1]["status"] == "running" + + def test_tool_failure_does_not_override_successful_plan_step_completion() -> None: app = _app_with_plan() app._subscribe() @@ -663,6 +726,43 @@ async def test_crew_done_does_not_timeout_memory_save() -> None: assert app._log_entries[1]["error"] == "No result received before crew completed" +@pytest.mark.asyncio +async def test_crew_failed_does_not_timeout_memory_save() -> None: + app = _app_with_plan() + + async with app.run_test(size=(100, 40)) as pilot: + app._log_entries = [ + { + "tool_name": "memory_save", + "status": "running", + "args": "9 memories (background)", + "result": None, + "error": None, + "start_time": time.time() - 8, + "duration": None, + "task_idx": 1, + }, + { + "tool_name": "search", + "status": "running", + "args": '{"query": "CrewAI"}', + "result": None, + "error": None, + "start_time": time.time() - 2, + "duration": None, + "task_idx": 1, + }, + ] + + app._on_crew_failed("boom") + await pilot.pause() + + assert app._log_entries[0]["status"] == "running" + assert app._log_entries[0]["error"] is None + assert app._log_entries[1]["status"] == "error" + assert app._log_entries[1]["error"] == "No result received before crew failed" + + def test_streamed_step_observation_updates_named_step_only() -> None: app = _app_with_plan()