Fix memory save activity log handling

This commit is contained in:
Joao Moura
2026-06-18 00:19:36 -07:00
parent 21d80bc37f
commit 3792b3cda5
2 changed files with 156 additions and 39 deletions

View File

@@ -34,12 +34,25 @@ _C_MUTED = "#666666" # dimmer than _C_DIM for past timeline
_STEP_NUMBER_RE = re.compile(r"\bstep\s+(\d+)\b", re.IGNORECASE)
_REFINEMENT_RE = re.compile(r"^\s*step\s+(\d+)\s*:\s*(.+)\s*$", re.IGNORECASE)
_INTERNAL_TOOL_NAMES = {"create_reasoning_plan"}
_LOG_ARGS_TEXT_LIMIT = 3_000
_LOG_RESULT_TEXT_LIMIT = 5_000
_LOG_TRUNCATION_SUFFIX = "... [truncated]"
def _is_save_to_memory_tool(tool_name: str | None) -> bool:
return (tool_name or "").replace(" ", "_").lower() == "save_to_memory"
def _truncate_log_text(value: Any, limit: int) -> str | None:
if value is None:
return None
text = str(value)
if len(text) <= limit:
return text
suffix = _LOG_TRUNCATION_SUFFIX
return f"{text[: max(0, limit - len(suffix))]}{suffix}"
def _enable_tracing_in_dotenv() -> None:
"""Append CREWAI_TRACING_ENABLED=true to .env if not already set."""
from pathlib import Path
@@ -702,6 +715,7 @@ FooterKey .footer-key--key {
if entry["tool_name"] == "memory_save":
continue
entry["status"] = "error"
entry["error"] = "No result received before crew failed"
entry["duration"] = now - entry["start_time"]
self._tick()
self.call_later(self._focus_activity_log)
@@ -1811,6 +1825,8 @@ FooterKey .footer-key--key {
entry["status"] == "running"
and entry["tool_name"] != event.tool_name
):
if entry["tool_name"] == "memory_save":
continue
entry["status"] = "timeout"
entry["error"] = (
"No result received before the next tool started"
@@ -1946,15 +1962,14 @@ FooterKey .footer-key--key {
return False
parent_node = state.event_record.nodes.get(event.parent_event_id)
parent_event = getattr(parent_node, "event", None)
return (
getattr(parent_event, "type", None) == "tool_usage_started"
and _is_save_to_memory_tool(getattr(parent_event, "tool_name", None))
return getattr(
parent_event, "type", None
) == "tool_usage_started" and _is_save_to_memory_tool(
getattr(parent_event, "tool_name", None)
)
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_memory_save_started(
source: Any, event: MemorySaveStartedEvent
) -> None:
def on_memory_save_started(source: Any, event: MemorySaveStartedEvent) -> None:
with self._lock:
if is_nested_save_to_memory_event(event):
self._suppressed_memory_save_event_ids.add(event.event_id)
@@ -1971,13 +1986,15 @@ FooterKey .footer-key--key {
entry["tool_name"] == "memory_save"
and entry.get("started_event_id") == event.event_id
):
entry["args"] = event.value
entry["args"] = _truncate_log_text(
event.value, _LOG_ARGS_TEXT_LIMIT
)
return
self._log_entries.append(
{
"tool_name": "memory_save",
"status": "running",
"args": event.value,
"args": _truncate_log_text(event.value, _LOG_ARGS_TEXT_LIMIT),
"result": None,
"error": None,
"start_time": time.time(),
@@ -1998,25 +2015,25 @@ FooterKey .footer-key--key {
event.started_event_id in self._suppressed_memory_save_event_ids
or is_nested_save_to_memory_event(event)
):
self._suppressed_memory_save_event_ids.discard(
event.started_event_id
)
if event.started_event_id is not None:
self._suppressed_memory_save_event_ids.discard(
event.started_event_id
)
return
for entry in reversed(self._log_entries):
if (
entry["tool_name"] == "memory_save"
and (
(
event.started_event_id is None
and entry["status"] == "running"
)
or entry.get("event_id") == event.started_event_id
or entry.get("started_event_id") == event.started_event_id
if entry["tool_name"] == "memory_save" and (
(
event.started_event_id is None
and entry["status"] == "running"
)
or entry.get("event_id") == event.started_event_id
or entry.get("started_event_id") == event.started_event_id
):
entry["status"] = "success"
entry["duration"] = event.save_time_ms / 1000
entry["result"] = event.value
entry["result"] = _truncate_log_text(
event.value, _LOG_RESULT_TEXT_LIMIT
)
entry["error"] = None
entry["started_event_id"] = event.started_event_id
break
@@ -2026,7 +2043,9 @@ FooterKey .footer-key--key {
"tool_name": "memory_save",
"status": "success",
"args": None,
"result": event.value,
"result": _truncate_log_text(
event.value, _LOG_RESULT_TEXT_LIMIT
),
"error": None,
"start_time": time.time(),
"duration": event.save_time_ms / 1000,
@@ -2038,29 +2057,25 @@ FooterKey .footer-key--key {
self._register_handler(MemorySaveCompletedEvent, on_memory_save_completed)
@crewai_event_bus.on(MemorySaveFailedEvent)
def on_memory_save_failed(
source: Any, event: MemorySaveFailedEvent
) -> None:
def on_memory_save_failed(source: Any, event: MemorySaveFailedEvent) -> None:
with self._lock:
if (
event.started_event_id in self._suppressed_memory_save_event_ids
or is_nested_save_to_memory_event(event)
):
self._suppressed_memory_save_event_ids.discard(
event.started_event_id
)
if event.started_event_id is not None:
self._suppressed_memory_save_event_ids.discard(
event.started_event_id
)
return
for idx, entry in reversed(list(enumerate(self._log_entries))):
if (
entry["tool_name"] == "memory_save"
and (
(
event.started_event_id is None
and entry["status"] == "running"
)
or entry.get("event_id") == event.started_event_id
or entry.get("started_event_id") == event.started_event_id
if entry["tool_name"] == "memory_save" and (
(
event.started_event_id is None
and entry["status"] == "running"
)
or entry.get("event_id") == event.started_event_id
or entry.get("started_event_id") == event.started_event_id
):
entry["status"] = "error"
entry["error"] = event.error
@@ -2073,7 +2088,9 @@ FooterKey .footer-key--key {
{
"tool_name": "memory_save",
"status": "error",
"args": event.value,
"args": _truncate_log_text(
event.value, _LOG_ARGS_TEXT_LIMIT
),
"result": None,
"error": event.error,
"start_time": time.time(),

View File

@@ -26,7 +26,12 @@ from crewai.events.types.tool_usage_events import (
)
from crewai_cli.command import AuthenticationRequiredError
from crewai_cli import run_crew
from crewai_cli.crew_run_tui import CrewRunApp
from crewai_cli.crew_run_tui import (
CrewRunApp,
_LOG_ARGS_TEXT_LIMIT,
_LOG_RESULT_TEXT_LIMIT,
_LOG_TRUNCATION_SUFFIX,
)
def _app_with_plan() -> CrewRunApp:
@@ -481,6 +486,64 @@ def test_memory_save_completion_updates_timed_out_row() -> None:
assert app._log_entries[0]["duration"] == 8.3
def test_memory_save_payloads_are_truncated_in_activity_log() -> None:
app = _app_with_plan()
long_args = "a" * (_LOG_ARGS_TEXT_LIMIT + 10)
long_result = "r" * (_LOG_RESULT_TEXT_LIMIT + 10)
app._subscribe()
try:
_emit_event(
MemorySaveStartedEvent(
value=long_args,
metadata={},
source_type="unified_memory",
)
)
_emit_event(
MemorySaveCompletedEvent(
value=long_result,
metadata={},
save_time_ms=8300,
source_type="unified_memory",
)
)
finally:
app._unsubscribe()
assert len(app._log_entries[0]["args"]) == _LOG_ARGS_TEXT_LIMIT
assert app._log_entries[0]["args"].endswith(_LOG_TRUNCATION_SUFFIX)
assert len(app._log_entries[0]["result"]) == _LOG_RESULT_TEXT_LIMIT
assert app._log_entries[0]["result"].endswith(_LOG_TRUNCATION_SUFFIX)
def test_starting_next_tool_does_not_timeout_memory_save() -> None:
app = _app_with_plan()
app._subscribe()
try:
_emit_event(
MemorySaveStartedEvent(
value="9 memories (background)",
metadata={},
source_type="unified_memory",
)
)
_emit_event(
ToolUsageStartedEvent(
tool_name="read_website_content",
tool_args={"url": "https://example.com"},
)
)
finally:
app._unsubscribe()
assert app._log_entries[0]["tool_name"] == "memory_save"
assert app._log_entries[0]["status"] == "running"
assert app._log_entries[0]["error"] is None
assert app._log_entries[1]["tool_name"] == "read_website_content"
assert app._log_entries[1]["status"] == "running"
def test_tool_failure_does_not_override_successful_plan_step_completion() -> None:
app = _app_with_plan()
app._subscribe()
@@ -663,6 +726,43 @@ async def test_crew_done_does_not_timeout_memory_save() -> None:
assert app._log_entries[1]["error"] == "No result received before crew completed"
@pytest.mark.asyncio
async def test_crew_failed_does_not_timeout_memory_save() -> None:
app = _app_with_plan()
async with app.run_test(size=(100, 40)) as pilot:
app._log_entries = [
{
"tool_name": "memory_save",
"status": "running",
"args": "9 memories (background)",
"result": None,
"error": None,
"start_time": time.time() - 8,
"duration": None,
"task_idx": 1,
},
{
"tool_name": "search",
"status": "running",
"args": '{"query": "CrewAI"}',
"result": None,
"error": None,
"start_time": time.time() - 2,
"duration": None,
"task_idx": 1,
},
]
app._on_crew_failed("boom")
await pilot.pause()
assert app._log_entries[0]["status"] == "running"
assert app._log_entries[0]["error"] is None
assert app._log_entries[1]["status"] == "error"
assert app._log_entries[1]["error"] == "No result received before crew failed"
def test_streamed_step_observation_updates_named_step_only() -> None:
app = _app_with_plan()