Refactor: Update supported native providers and enhance memory handling

- Removed "groq" and "meta" from the list of supported native providers in `llm.py`.
- Added a safeguard in `flow.py` to ensure all background memory saves complete before returning.
- Improved error handling in `unified_memory.py` to prevent exceptions during shutdown, ensuring smoother memory operations and event bus interactions.
This commit is contained in:
Joao Moura
2026-02-13 21:02:31 -08:00
parent ae9d88e308
commit f31df34182
3 changed files with 69 additions and 41 deletions

View File

@@ -1974,6 +1974,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
return final_output
finally:
# Ensure all background memory saves complete before returning
if self.memory is not None and hasattr(self.memory, "drain_writes"):
self.memory.drain_writes()
if request_id_token is not None:
current_flow_request_id.reset(request_id_token)
if flow_id_token is not None:

View File

@@ -325,8 +325,6 @@ SUPPORTED_NATIVE_PROVIDERS: Final[list[str]] = [
"gemini",
"bedrock",
"aws",
"groq",
"meta"
]

View File

@@ -224,22 +224,30 @@ class Memory:
return future
def _on_save_done(self, future: Future[Any]) -> None:
"""Remove a completed future from the pending list and emit failure event if needed."""
with self._pending_lock:
try:
self._pending_saves.remove(future)
except ValueError:
pass # already removed
exc = future.exception()
if exc is not None:
crewai_event_bus.emit(
self,
MemorySaveFailedEvent(
value="background save",
error=str(exc),
source_type="unified_memory",
),
)
"""Remove a completed future from the pending list and emit failure event if needed.
This callback must never raise -- it runs from the thread pool's
internal machinery during process shutdown when executors and the
event bus may already be closed.
"""
try:
with self._pending_lock:
try:
self._pending_saves.remove(future)
except ValueError:
pass # already removed
exc = future.exception()
if exc is not None:
crewai_event_bus.emit(
self,
MemorySaveFailedEvent(
value="background save",
error=str(exc),
source_type="unified_memory",
),
)
except Exception: # noqa: S110
pass # swallow everything during shutdown
def drain_writes(self) -> None:
"""Block until all pending background saves have completed.
@@ -437,30 +445,49 @@ class Memory:
Both started and completed events are emitted here (in the background
thread) so they pair correctly on the event bus scope stack.
All ``emit`` calls are wrapped in try/except to handle the case where
the event bus shuts down before the background save finishes (e.g.
during process exit).
"""
crewai_event_bus.emit(
self,
MemorySaveStartedEvent(
value=f"{len(contents)} memories (background)",
metadata=metadata,
source_type="unified_memory",
),
)
start = time.perf_counter()
records = self._encode_batch(
contents, scope, categories, metadata, importance, source, private
)
elapsed_ms = (time.perf_counter() - start) * 1000
crewai_event_bus.emit(
self,
MemorySaveCompletedEvent(
value=f"{len(records)} memories saved",
metadata=metadata or {},
agent_role=agent_role,
save_time_ms=elapsed_ms,
source_type="unified_memory",
),
)
try:
crewai_event_bus.emit(
self,
MemorySaveStartedEvent(
value=f"{len(contents)} memories (background)",
metadata=metadata,
source_type="unified_memory",
),
)
except RuntimeError:
pass # event bus shut down during process exit
try:
start = time.perf_counter()
records = self._encode_batch(
contents, scope, categories, metadata, importance, source, private
)
elapsed_ms = (time.perf_counter() - start) * 1000
except RuntimeError:
# The encoding pipeline uses asyncio.run() -> to_thread() internally.
# If the process is shutting down, the default executor is closed and
# to_thread raises "cannot schedule new futures after shutdown".
# Silently abandon the save -- the process is exiting anyway.
return []
try:
crewai_event_bus.emit(
self,
MemorySaveCompletedEvent(
value=f"{len(records)} memories saved",
metadata=metadata or {},
agent_role=agent_role,
save_time_ms=elapsed_ms,
source_type="unified_memory",
),
)
except RuntimeError:
pass # event bus shut down during process exit
return records
def extract_memories(self, content: str) -> list[str]: