diff --git a/lib/cli/src/crewai_cli/crew_run_tui.py b/lib/cli/src/crewai_cli/crew_run_tui.py index 23c024055..1fc93337d 100644 --- a/lib/cli/src/crewai_cli/crew_run_tui.py +++ b/lib/cli/src/crewai_cli/crew_run_tui.py @@ -36,6 +36,10 @@ _REFINEMENT_RE = re.compile(r"^\s*step\s+(\d+)\s*:\s*(.+)\s*$", re.IGNORECASE) _INTERNAL_TOOL_NAMES = {"create_reasoning_plan"} +def _is_save_to_memory_tool(tool_name: str | None) -> bool: + return (tool_name or "").replace(" ", "_").lower() == "save_to_memory" + + def _enable_tracing_in_dotenv() -> None: """Append CREWAI_TRACING_ENABLED=true to .env if not already set.""" from pathlib import Path @@ -519,6 +523,7 @@ FooterKey .footer-key--key { self._log_expanded: set[int] = set() self._log_scroll_needed: bool = False self._log_line_map: list[tuple[int, int, int]] = [] + self._suppressed_memory_save_event_ids: set[str] = set() self._event_handlers: list[tuple[type, Any]] = [] @@ -649,6 +654,8 @@ FooterKey .footer-key--key { now = time.time() for entry in self._log_entries: if entry["status"] == "running": + if entry["tool_name"] == "memory_save": + continue entry["status"] = "timeout" entry["error"] = "No result received before crew completed" entry["duration"] = now - entry["start_time"] @@ -692,6 +699,8 @@ FooterKey .footer-key--key { now = time.time() for entry in self._log_entries: if entry["status"] == "running": + if entry["tool_name"] == "memory_save": + continue entry["status"] = "error" entry["duration"] = now - entry["start_time"] self._tick() @@ -1830,6 +1839,7 @@ FooterKey .footer-key--key { "duration": None, "task_idx": self._current_task_idx, "plan_step_number": plan_step_number, + "event_id": event.event_id, } ) self._complete_step("teal", f"⚡ {event.tool_name}…") @@ -1928,11 +1938,34 @@ FooterKey .footer-key--key { MemorySaveStartedEvent, ) + def is_nested_save_to_memory_event(event: Any) -> bool: + if event.parent_event_id is None: + return False + state = crewai_event_bus.runtime_state + if state is None: + return False + parent_node = state.event_record.nodes.get(event.parent_event_id) + parent_event = getattr(parent_node, "event", None) + return ( + getattr(parent_event, "type", None) == "tool_usage_started" + and _is_save_to_memory_tool(getattr(parent_event, "tool_name", None)) + ) + @crewai_event_bus.on(MemorySaveStartedEvent) def on_memory_save_started( source: Any, event: MemorySaveStartedEvent ) -> None: with self._lock: + if is_nested_save_to_memory_event(event): + self._suppressed_memory_save_event_ids.add(event.event_id) + return + for entry in reversed(self._log_entries): + if ( + _is_save_to_memory_tool(entry["tool_name"]) + and entry.get("event_id") == event.parent_event_id + ): + self._suppressed_memory_save_event_ids.add(event.event_id) + return for entry in reversed(self._log_entries): if ( entry["tool_name"] == "memory_save" @@ -1961,18 +1994,30 @@ FooterKey .footer-key--key { source: Any, event: MemorySaveCompletedEvent ) -> None: with self._lock: + if ( + event.started_event_id in self._suppressed_memory_save_event_ids + or is_nested_save_to_memory_event(event) + ): + self._suppressed_memory_save_event_ids.discard( + event.started_event_id + ) + return for entry in reversed(self._log_entries): if ( entry["tool_name"] == "memory_save" - and entry["status"] == "running" and ( - event.started_event_id is None + ( + event.started_event_id is None + and entry["status"] == "running" + ) or entry.get("event_id") == event.started_event_id + or entry.get("started_event_id") == event.started_event_id ) ): entry["status"] = "success" entry["duration"] = event.save_time_ms / 1000 entry["result"] = event.value + entry["error"] = None entry["started_event_id"] = event.started_event_id break else: @@ -1997,13 +2042,24 @@ FooterKey .footer-key--key { source: Any, event: MemorySaveFailedEvent ) -> None: with self._lock: + if ( + event.started_event_id in self._suppressed_memory_save_event_ids + or is_nested_save_to_memory_event(event) + ): + self._suppressed_memory_save_event_ids.discard( + event.started_event_id + ) + return for idx, entry in reversed(list(enumerate(self._log_entries))): if ( entry["tool_name"] == "memory_save" - and entry["status"] == "running" and ( - event.started_event_id is None + ( + event.started_event_id is None + and entry["status"] == "running" + ) or entry.get("event_id") == event.started_event_id + or entry.get("started_event_id") == event.started_event_id ) ): entry["status"] = "error" diff --git a/lib/cli/tests/test_crew_run_tui.py b/lib/cli/tests/test_crew_run_tui.py index 15290e2d9..03b03b89b 100644 --- a/lib/cli/tests/test_crew_run_tui.py +++ b/lib/cli/tests/test_crew_run_tui.py @@ -373,6 +373,51 @@ def test_memory_save_events_are_shown_in_activity_log() -> None: assert app._log_entries[0]["task_idx"] == 1 +def test_nested_memory_save_event_is_hidden_for_save_to_memory_tool() -> None: + app = _app_with_plan() + app._subscribe() + try: + tool_args = {"contents": ["Fact to remember."]} + _emit_event( + ToolUsageStartedEvent( + tool_name="save_to_memory", + tool_args=tool_args, + ) + ) + _emit_event( + MemorySaveStartedEvent( + value="Fact to remember.", + metadata={}, + source_type="unified_memory", + ) + ) + _emit_event( + MemorySaveCompletedEvent( + value="Fact to remember.", + metadata={}, + save_time_ms=123, + source_type="unified_memory", + ) + ) + now = datetime.now() + _emit_event( + ToolUsageFinishedEvent( + tool_name="save_to_memory", + tool_args=tool_args, + started_at=now, + finished_at=now, + output="Saved to memory.", + ) + ) + finally: + app._unsubscribe() + + assert len(app._log_entries) == 1 + assert app._log_entries[0]["tool_name"] == "save_to_memory" + assert app._log_entries[0]["status"] == "success" + assert app._log_entries[0]["result"] == "Saved to memory." + + def test_memory_save_failure_is_shown_in_activity_log() -> None: app = _app_with_plan() app._subscribe() @@ -401,6 +446,41 @@ def test_memory_save_failure_is_shown_in_activity_log() -> None: assert app._log_expanded == {0} +def test_memory_save_completion_updates_timed_out_row() -> None: + app = _app_with_plan() + app._subscribe() + try: + _emit_event( + MemorySaveStartedEvent( + value="9 memories (background)", + metadata={}, + source_type="unified_memory", + ) + ) + + app._log_entries[0]["status"] = "timeout" + app._log_entries[0]["error"] = "No result received before crew completed" + app._log_entries[0]["duration"] = 8.3 + + _emit_event( + MemorySaveCompletedEvent( + value="9 memories saved", + metadata={}, + save_time_ms=8300, + source_type="unified_memory", + ) + ) + finally: + app._unsubscribe() + + assert len(app._log_entries) == 1 + assert app._log_entries[0]["tool_name"] == "memory_save" + assert app._log_entries[0]["status"] == "success" + assert app._log_entries[0]["result"] == "9 memories saved" + assert app._log_entries[0]["error"] is None + assert app._log_entries[0]["duration"] == 8.3 + + def test_tool_failure_does_not_override_successful_plan_step_completion() -> None: app = _app_with_plan() app._subscribe() @@ -546,6 +626,43 @@ async def test_crew_done_does_not_mark_unfinished_tool_successful() -> None: assert app._plan_step_status == {1: "failed", 2: "done", 3: "done"} +@pytest.mark.asyncio +async def test_crew_done_does_not_timeout_memory_save() -> None: + app = _app_with_plan() + + async with app.run_test(size=(100, 40)) as pilot: + app._log_entries = [ + { + "tool_name": "memory_save", + "status": "running", + "args": "9 memories (background)", + "result": None, + "error": None, + "start_time": time.time() - 8, + "duration": None, + "task_idx": 1, + }, + { + "tool_name": "search", + "status": "running", + "args": '{"query": "CrewAI"}', + "result": None, + "error": None, + "start_time": time.time() - 2, + "duration": None, + "task_idx": 1, + }, + ] + + app._on_crew_done("final output") + await pilot.pause() + + assert app._log_entries[0]["status"] == "running" + assert app._log_entries[0]["error"] is None + assert app._log_entries[1]["status"] == "timeout" + assert app._log_entries[1]["error"] == "No result received before crew completed" + + def test_streamed_step_observation_updates_named_step_only() -> None: app = _app_with_plan()