From f31df3418283c929cf678a22d87235a8ce4ed97e Mon Sep 17 00:00:00 2001 From: Joao Moura Date: Fri, 13 Feb 2026 21:02:31 -0800 Subject: [PATCH] Refactor: Update supported native providers and enhance memory handling - Removed "groq" and "meta" from the list of supported native providers in `llm.py`. - Added a safeguard in `flow.py` to ensure all background memory saves complete before returning. - Improved error handling in `unified_memory.py` to prevent exceptions during shutdown, ensuring smoother memory operations and event bus interactions. --- lib/crewai/src/crewai/flow/flow.py | 3 + lib/crewai/src/crewai/llm.py | 2 - .../src/crewai/memory/unified_memory.py | 105 +++++++++++------- 3 files changed, 69 insertions(+), 41 deletions(-) diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index ca3925198..9d060978f 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1974,6 +1974,9 @@ class Flow(Generic[T], metaclass=FlowMeta): return final_output finally: + # Ensure all background memory saves complete before returning + if self.memory is not None and hasattr(self.memory, "drain_writes"): + self.memory.drain_writes() if request_id_token is not None: current_flow_request_id.reset(request_id_token) if flow_id_token is not None: diff --git a/lib/crewai/src/crewai/llm.py b/lib/crewai/src/crewai/llm.py index 78d844786..20a0373cb 100644 --- a/lib/crewai/src/crewai/llm.py +++ b/lib/crewai/src/crewai/llm.py @@ -325,8 +325,6 @@ SUPPORTED_NATIVE_PROVIDERS: Final[list[str]] = [ "gemini", "bedrock", "aws", - "groq", - "meta" ] diff --git a/lib/crewai/src/crewai/memory/unified_memory.py b/lib/crewai/src/crewai/memory/unified_memory.py index 432262048..a15f77afd 100644 --- a/lib/crewai/src/crewai/memory/unified_memory.py +++ b/lib/crewai/src/crewai/memory/unified_memory.py @@ -224,22 +224,30 @@ class Memory: return future def _on_save_done(self, future: Future[Any]) -> None: - """Remove a completed future from the pending list and emit failure event if needed.""" - with self._pending_lock: - try: - self._pending_saves.remove(future) - except ValueError: - pass # already removed - exc = future.exception() - if exc is not None: - crewai_event_bus.emit( - self, - MemorySaveFailedEvent( - value="background save", - error=str(exc), - source_type="unified_memory", - ), - ) + """Remove a completed future from the pending list and emit failure event if needed. + + This callback must never raise -- it runs from the thread pool's + internal machinery during process shutdown when executors and the + event bus may already be closed. + """ + try: + with self._pending_lock: + try: + self._pending_saves.remove(future) + except ValueError: + pass # already removed + exc = future.exception() + if exc is not None: + crewai_event_bus.emit( + self, + MemorySaveFailedEvent( + value="background save", + error=str(exc), + source_type="unified_memory", + ), + ) + except Exception: # noqa: S110 + pass # swallow everything during shutdown def drain_writes(self) -> None: """Block until all pending background saves have completed. @@ -437,30 +445,49 @@ class Memory: Both started and completed events are emitted here (in the background thread) so they pair correctly on the event bus scope stack. + + All ``emit`` calls are wrapped in try/except to handle the case where + the event bus shuts down before the background save finishes (e.g. + during process exit). """ - crewai_event_bus.emit( - self, - MemorySaveStartedEvent( - value=f"{len(contents)} memories (background)", - metadata=metadata, - source_type="unified_memory", - ), - ) - start = time.perf_counter() - records = self._encode_batch( - contents, scope, categories, metadata, importance, source, private - ) - elapsed_ms = (time.perf_counter() - start) * 1000 - crewai_event_bus.emit( - self, - MemorySaveCompletedEvent( - value=f"{len(records)} memories saved", - metadata=metadata or {}, - agent_role=agent_role, - save_time_ms=elapsed_ms, - source_type="unified_memory", - ), - ) + try: + crewai_event_bus.emit( + self, + MemorySaveStartedEvent( + value=f"{len(contents)} memories (background)", + metadata=metadata, + source_type="unified_memory", + ), + ) + except RuntimeError: + pass # event bus shut down during process exit + + try: + start = time.perf_counter() + records = self._encode_batch( + contents, scope, categories, metadata, importance, source, private + ) + elapsed_ms = (time.perf_counter() - start) * 1000 + except RuntimeError: + # The encoding pipeline uses asyncio.run() -> to_thread() internally. + # If the process is shutting down, the default executor is closed and + # to_thread raises "cannot schedule new futures after shutdown". + # Silently abandon the save -- the process is exiting anyway. + return [] + + try: + crewai_event_bus.emit( + self, + MemorySaveCompletedEvent( + value=f"{len(records)} memories saved", + metadata=metadata or {}, + agent_role=agent_role, + save_time_ms=elapsed_ms, + source_type="unified_memory", + ), + ) + except RuntimeError: + pass # event bus shut down during process exit return records def extract_memories(self, content: str) -> list[str]: