From ae57e5723c999735c42119f4555483891d23962d Mon Sep 17 00:00:00 2001 From: Lucas Gomide Date: Wed, 2 Jul 2025 12:00:26 -0300 Subject: [PATCH] feat: add console logging for memory system usage (#3103) --- src/crewai/utilities/events/event_listener.py | 4 + .../events/listeners/memory_listener.py | 110 ++++++++ .../events/utils/console_formatter.py | 247 ++++++++++++++++++ 3 files changed, 361 insertions(+) create mode 100644 src/crewai/utilities/events/listeners/memory_listener.py diff --git a/src/crewai/utilities/events/event_listener.py b/src/crewai/utilities/events/event_listener.py index 7be6b9bb0..a7cf82d9b 100644 --- a/src/crewai/utilities/events/event_listener.py +++ b/src/crewai/utilities/events/event_listener.py @@ -65,6 +65,8 @@ from .reasoning_events import ( AgentReasoningFailedEvent, ) +from .listeners.memory_listener import MemoryListener + class EventListener(BaseEventListener): _instance = None @@ -91,6 +93,8 @@ class EventListener(BaseEventListener): self._initialized = True self.formatter = ConsoleFormatter(verbose=True) + MemoryListener(formatter=self.formatter) + # ----------- CREW EVENTS ----------- def setup_listeners(self, crewai_event_bus): diff --git a/src/crewai/utilities/events/listeners/memory_listener.py b/src/crewai/utilities/events/listeners/memory_listener.py new file mode 100644 index 000000000..82935a3b7 --- /dev/null +++ b/src/crewai/utilities/events/listeners/memory_listener.py @@ -0,0 +1,110 @@ +from crewai.utilities.events.base_event_listener import BaseEventListener +from crewai.utilities.events.memory_events import ( + MemoryRetrievalCompletedEvent, + MemoryRetrievalStartedEvent, + MemoryQueryFailedEvent, + MemoryQueryCompletedEvent, + MemorySaveStartedEvent, + MemorySaveCompletedEvent, + MemorySaveFailedEvent, +) + +class MemoryListener(BaseEventListener): + + def __init__(self, formatter): + super().__init__() + self.formatter = formatter + self.memory_retrieval_in_progress = False + self.memory_save_in_progress = False + + def setup_listeners(self, crewai_event_bus): + @crewai_event_bus.on(MemoryRetrievalStartedEvent) + def on_memory_retrieval_started( + source, event: MemoryRetrievalStartedEvent + ): + if self.memory_retrieval_in_progress: + return + + self.memory_retrieval_in_progress = True + + self.formatter.handle_memory_retrieval_started( + self.formatter.current_agent_branch, + self.formatter.current_crew_tree, + ) + + @crewai_event_bus.on(MemoryRetrievalCompletedEvent) + def on_memory_retrieval_completed( + source, event: MemoryRetrievalCompletedEvent + ): + if not self.memory_retrieval_in_progress: + return + + self.memory_retrieval_in_progress = False + self.formatter.handle_memory_retrieval_completed( + self.formatter.current_agent_branch, + self.formatter.current_crew_tree, + event.memory_content, + event.retrieval_time_ms + ) + + @crewai_event_bus.on(MemoryQueryCompletedEvent) + def on_memory_query_completed(source, event: MemoryQueryCompletedEvent): + if not self.memory_retrieval_in_progress: + return + + self.formatter.handle_memory_query_completed( + self.formatter.current_agent_branch, + event.source_type, + event.query_time_ms, + self.formatter.current_crew_tree, + ) + + @crewai_event_bus.on(MemoryQueryFailedEvent) + def on_memory_query_failed(source, event: MemoryQueryFailedEvent): + if not self.memory_retrieval_in_progress: + return + + self.formatter.handle_memory_query_failed( + self.formatter.current_agent_branch, + self.formatter.current_crew_tree, + event.error, + event.source_type, + ) + + @crewai_event_bus.on(MemorySaveStartedEvent) + def on_memory_save_started(source, event: MemorySaveStartedEvent): + if self.memory_save_in_progress: + return + + self.memory_save_in_progress = True + + self.formatter.handle_memory_save_started( + self.formatter.current_agent_branch, + self.formatter.current_crew_tree, + ) + + @crewai_event_bus.on(MemorySaveCompletedEvent) + def on_memory_save_completed(source, event: MemorySaveCompletedEvent): + if not self.memory_save_in_progress: + return + + self.memory_save_in_progress = False + + self.formatter.handle_memory_save_completed( + self.formatter.current_agent_branch, + self.formatter.current_crew_tree, + event.save_time_ms, + event.source_type, + ) + + @crewai_event_bus.on(MemorySaveFailedEvent) + def on_memory_save_failed(source, event: MemorySaveFailedEvent): + if not self.memory_save_in_progress: + return + + self.formatter.handle_memory_save_failed( + self.formatter.current_agent_branch, + event.error, + event.source_type, + self.formatter.current_crew_tree, + ) \ No newline at end of file diff --git a/src/crewai/utilities/events/utils/console_formatter.py b/src/crewai/utilities/events/utils/console_formatter.py index ccf1837cc..901c74468 100644 --- a/src/crewai/utilities/events/utils/console_formatter.py +++ b/src/crewai/utilities/events/utils/console_formatter.py @@ -1454,3 +1454,250 @@ class ConsoleFormatter: ) self.print(finish_panel) self.print() + + def handle_memory_retrieval_started( + self, + agent_branch: Optional[Tree], + crew_tree: Optional[Tree], + ) -> Optional[Tree]: + if not self.verbose: + return None + + branch_to_use = agent_branch or self.current_lite_agent_branch + tree_to_use = branch_to_use or crew_tree + + if branch_to_use is None or tree_to_use is None: + if crew_tree is not None: + branch_to_use = tree_to_use = crew_tree + else: + return None + + memory_branch = branch_to_use.add("") + self.update_tree_label( + memory_branch, "🧠", "Memory Retrieval Started", "blue" + ) + + self.print(tree_to_use) + self.print() + return memory_branch + + def handle_memory_retrieval_completed( + self, + agent_branch: Optional[Tree], + crew_tree: Optional[Tree], + memory_content: str, + retrieval_time_ms: float, + ) -> None: + if not self.verbose: + return None + + branch_to_use = self.current_lite_agent_branch or agent_branch + tree_to_use = branch_to_use or crew_tree + + if branch_to_use is None and tree_to_use is not None: + branch_to_use = tree_to_use + + def add_panel(): + memory_text = str(memory_content) + if len(memory_text) > 500: + memory_text = memory_text[:497] + "..." + + memory_panel = Panel( + Text(memory_text, style="white"), + title="🧠 Retrieved Memory", + subtitle=f"Retrieval Time: {retrieval_time_ms:.2f}ms", + border_style="green", + padding=(1, 2), + ) + self.print(memory_panel) + self.print() + + if branch_to_use is None or tree_to_use is None: + add_panel() + return None + + memory_branch_found = False + for child in branch_to_use.children: + if "Memory Retrieval Started" in str(child.label): + self.update_tree_label( + child, "✅", "Memory Retrieval Completed", "green" + ) + memory_branch_found = True + break + + if not memory_branch_found: + for child in branch_to_use.children: + if ( + "Memory Retrieval" in str(child.label) + and "Started" not in str(child.label) + and "Completed" not in str(child.label) + ): + self.update_tree_label( + child, "✅", "Memory Retrieval Completed", "green" + ) + memory_branch_found = True + break + + if not memory_branch_found: + memory_branch = branch_to_use.add("") + self.update_tree_label( + memory_branch, "✅", "Memory Retrieval Completed", "green" + ) + + self.print(tree_to_use) + + if memory_content: + add_panel() + + + def handle_memory_query_completed( + self, + agent_branch: Optional[Tree], + source_type: str, + query_time_ms: float, + crew_tree: Optional[Tree], + ) -> None: + if not self.verbose: + return None + + branch_to_use = self.current_lite_agent_branch or agent_branch + tree_to_use = branch_to_use or crew_tree + + if branch_to_use is None and tree_to_use is not None: + branch_to_use = tree_to_use + + if branch_to_use is None: + return None + + memory_type = source_type.replace("_", " ").title() + + for child in branch_to_use.children: + if "Memory Retrieval" in str(child.label): + for child in child.children: + sources_branch = child + if "Sources Used" in str(child.label): + sources_branch.add(f"✅ {memory_type} ({query_time_ms:.2f}ms)") + break + else: + sources_branch = child.add("Sources Used") + sources_branch.add(f"✅ {memory_type} ({query_time_ms:.2f}ms)") + break + + def handle_memory_query_failed( + self, + agent_branch: Optional[Tree], + crew_tree: Optional[Tree], + error: str, + source_type: str, + ) -> None: + if not self.verbose: + return None + + branch_to_use = self.current_lite_agent_branch or agent_branch + tree_to_use = branch_to_use or crew_tree + + if branch_to_use is None and tree_to_use is not None: + branch_to_use = tree_to_use + + if branch_to_use is None: + return None + + memory_type = source_type.replace("_", " ").title() + + for child in branch_to_use.children: + if "Memory Retrieval" in str(child.label): + for child in child.children: + sources_branch = child + if "Sources Used" in str(child.label): + sources_branch.add(f"❌ {memory_type} - Error: {error}") + break + else: + sources_branch = child.add("🧠 Sources Used") + sources_branch.add(f"❌ {memory_type} - Error: {error}") + break + + + def handle_memory_save_started( + self, + agent_branch: Optional[Tree], + crew_tree: Optional[Tree] + ) -> None: + if not self.verbose: + return None + + branch_to_use = agent_branch or self.current_lite_agent_branch + tree_to_use = branch_to_use or crew_tree + + if tree_to_use is None: + return None + + for child in tree_to_use.children: + if "Memory Update" in str(child.label): + break + else: + memory_branch = tree_to_use.add("") + self.update_tree_label( + memory_branch, "🧠", "Memory Update Overall", "white" + ) + + self.print(tree_to_use) + self.print() + + def handle_memory_save_completed( + self, + agent_branch: Optional[Tree], + crew_tree: Optional[Tree], + save_time_ms: float, + source_type: str, + ) -> None: + if not self.verbose: + return None + + branch_to_use = agent_branch or self.current_lite_agent_branch + tree_to_use = branch_to_use or crew_tree + + if tree_to_use is None: + return None + + memory_type = source_type.replace("_", " ").title() + content = f"✅ {memory_type} Memory Saved ({save_time_ms:.2f}ms)" + + for child in tree_to_use.children: + if "Memory Update" in str(child.label): + child.add(content) + break + else: + memory_branch = tree_to_use.add("") + memory_branch.add(content) + + self.print(tree_to_use) + self.print() + + def handle_memory_save_failed( + self, + agent_branch: Optional[Tree], + error: str, + source_type: str, + crew_tree: Optional[Tree], + ) -> None: + if not self.verbose: + return None + + branch_to_use = agent_branch or self.current_lite_agent_branch + tree_to_use = branch_to_use or crew_tree + + if branch_to_use is None or tree_to_use is None: + return None + + memory_type = source_type.replace("_", " ").title() + content = f"❌ {memory_type} Memory Save Failed" + for child in branch_to_use.children: + if "Memory Update" in str(child.label): + child.add(content) + break + else: + memory_branch = branch_to_use.add("") + memory_branch.add(content) + + self.print(tree_to_use) + self.print() \ No newline at end of file