mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 23:02:50 +00:00
New Memory Improvements (#4484)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
* better DevEx * Refactor: Update supported native providers and enhance memory handling - Removed "groq" and "meta" from the list of supported native providers in `llm.py`. - Added a safeguard in `flow.py` to ensure all background memory saves complete before returning. - Improved error handling in `unified_memory.py` to prevent exceptions during shutdown, ensuring smoother memory operations and event bus interactions. * Enhance Memory System with Consolidation and Learning Features - Introduced memory consolidation mechanisms to prevent duplicate records during content saving, utilizing similarity checks and LLM decision-making. - Implemented non-blocking save operations in the memory system, allowing agents to continue tasks while memory is being saved. - Added support for learning from human feedback, enabling the system to distill lessons from past corrections and improve future outputs. - Updated documentation to reflect new features and usage examples for memory consolidation and HITL learning. * Enhance cyclic flow handling for or_() listeners - Updated the Flow class to ensure that all fired or_() listeners are cleared between cycle iterations, allowing them to fire again in subsequent cycles. This change addresses a bug where listeners remained suppressed across iterations. - Added regression tests to verify that or_() listeners fire correctly on every iteration in cyclic flows, ensuring expected behavior in complex routing scenarios.
This commit is contained in:
@@ -1974,6 +1974,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
|
||||
return final_output
|
||||
finally:
|
||||
# Ensure all background memory saves complete before returning
|
||||
if self.memory is not None and hasattr(self.memory, "drain_writes"):
|
||||
self.memory.drain_writes()
|
||||
if request_id_token is not None:
|
||||
current_flow_request_id.reset(request_id_token)
|
||||
if flow_id_token is not None:
|
||||
@@ -2530,8 +2533,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return (None, None)
|
||||
# For cyclic flows, clear from completed to allow re-execution
|
||||
self._completed_methods.discard(listener_name)
|
||||
# Also clear from fired OR listeners for cyclic flows
|
||||
self._discard_or_listener(listener_name)
|
||||
# Clear ALL fired OR listeners so they can fire again in the new cycle.
|
||||
# This mirrors what _execute_start_method does for start-method cycles.
|
||||
# Only discarding the individual listener is insufficient because
|
||||
# downstream or_() listeners (e.g., method_a listening to
|
||||
# or_(handler_a, handler_b)) would remain suppressed across iterations.
|
||||
self._clear_or_listeners()
|
||||
|
||||
try:
|
||||
method = self._methods[listener_name]
|
||||
|
||||
@@ -419,8 +419,22 @@ class LLM(BaseLLM):
|
||||
|
||||
# FALLBACK to LiteLLM
|
||||
if not LITELLM_AVAILABLE:
|
||||
logger.error("LiteLLM is not available, falling back to LiteLLM")
|
||||
raise ImportError("Fallback to LiteLLM is not available") from None
|
||||
native_list = ", ".join(SUPPORTED_NATIVE_PROVIDERS)
|
||||
error_msg = (
|
||||
f"Unable to initialize LLM with model '{model}'. "
|
||||
f"The model did not match any supported native provider "
|
||||
f"({native_list}), and the LiteLLM fallback package is not "
|
||||
f"installed.\n\n"
|
||||
f"To fix this, either:\n"
|
||||
f" 1. Install LiteLLM for broad model support: "
|
||||
f"uv add litellm\n"
|
||||
f"or\n"
|
||||
f"pip install litellm\n\n"
|
||||
f"For more details, see: "
|
||||
f"https://docs.crewai.com/en/learn/llm-connections"
|
||||
)
|
||||
logger.error(error_msg)
|
||||
raise ImportError(error_msg) from None
|
||||
|
||||
instance = object.__new__(cls)
|
||||
super(LLM, instance).__init__(model=model, is_litellm=True, **kwargs)
|
||||
|
||||
@@ -224,22 +224,30 @@ class Memory:
|
||||
return future
|
||||
|
||||
def _on_save_done(self, future: Future[Any]) -> None:
|
||||
"""Remove a completed future from the pending list and emit failure event if needed."""
|
||||
with self._pending_lock:
|
||||
try:
|
||||
self._pending_saves.remove(future)
|
||||
except ValueError:
|
||||
pass # already removed
|
||||
exc = future.exception()
|
||||
if exc is not None:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
MemorySaveFailedEvent(
|
||||
value="background save",
|
||||
error=str(exc),
|
||||
source_type="unified_memory",
|
||||
),
|
||||
)
|
||||
"""Remove a completed future from the pending list and emit failure event if needed.
|
||||
|
||||
This callback must never raise -- it runs from the thread pool's
|
||||
internal machinery during process shutdown when executors and the
|
||||
event bus may already be closed.
|
||||
"""
|
||||
try:
|
||||
with self._pending_lock:
|
||||
try:
|
||||
self._pending_saves.remove(future)
|
||||
except ValueError:
|
||||
pass # already removed
|
||||
exc = future.exception()
|
||||
if exc is not None:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
MemorySaveFailedEvent(
|
||||
value="background save",
|
||||
error=str(exc),
|
||||
source_type="unified_memory",
|
||||
),
|
||||
)
|
||||
except Exception: # noqa: S110
|
||||
pass # swallow everything during shutdown
|
||||
|
||||
def drain_writes(self) -> None:
|
||||
"""Block until all pending background saves have completed.
|
||||
@@ -437,30 +445,49 @@ class Memory:
|
||||
|
||||
Both started and completed events are emitted here (in the background
|
||||
thread) so they pair correctly on the event bus scope stack.
|
||||
|
||||
All ``emit`` calls are wrapped in try/except to handle the case where
|
||||
the event bus shuts down before the background save finishes (e.g.
|
||||
during process exit).
|
||||
"""
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
MemorySaveStartedEvent(
|
||||
value=f"{len(contents)} memories (background)",
|
||||
metadata=metadata,
|
||||
source_type="unified_memory",
|
||||
),
|
||||
)
|
||||
start = time.perf_counter()
|
||||
records = self._encode_batch(
|
||||
contents, scope, categories, metadata, importance, source, private
|
||||
)
|
||||
elapsed_ms = (time.perf_counter() - start) * 1000
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
MemorySaveCompletedEvent(
|
||||
value=f"{len(records)} memories saved",
|
||||
metadata=metadata or {},
|
||||
agent_role=agent_role,
|
||||
save_time_ms=elapsed_ms,
|
||||
source_type="unified_memory",
|
||||
),
|
||||
)
|
||||
try:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
MemorySaveStartedEvent(
|
||||
value=f"{len(contents)} memories (background)",
|
||||
metadata=metadata,
|
||||
source_type="unified_memory",
|
||||
),
|
||||
)
|
||||
except RuntimeError:
|
||||
pass # event bus shut down during process exit
|
||||
|
||||
try:
|
||||
start = time.perf_counter()
|
||||
records = self._encode_batch(
|
||||
contents, scope, categories, metadata, importance, source, private
|
||||
)
|
||||
elapsed_ms = (time.perf_counter() - start) * 1000
|
||||
except RuntimeError:
|
||||
# The encoding pipeline uses asyncio.run() -> to_thread() internally.
|
||||
# If the process is shutting down, the default executor is closed and
|
||||
# to_thread raises "cannot schedule new futures after shutdown".
|
||||
# Silently abandon the save -- the process is exiting anyway.
|
||||
return []
|
||||
|
||||
try:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
MemorySaveCompletedEvent(
|
||||
value=f"{len(records)} memories saved",
|
||||
metadata=metadata or {},
|
||||
agent_role=agent_role,
|
||||
save_time_ms=elapsed_ms,
|
||||
source_type="unified_memory",
|
||||
),
|
||||
)
|
||||
except RuntimeError:
|
||||
pass # event bus shut down during process exit
|
||||
return records
|
||||
|
||||
def extract_memories(self, content: str) -> list[str]:
|
||||
|
||||
@@ -1647,3 +1647,128 @@ class TestFlowAkickoff:
|
||||
|
||||
assert execution_order == ["begin", "route", "path_a"]
|
||||
assert result == "path_a_result"
|
||||
|
||||
|
||||
def test_cyclic_flow_or_listeners_fire_every_iteration():
|
||||
"""Test that or_() listeners reset between cycle iterations through a router.
|
||||
|
||||
Regression test for a bug where _fired_or_listeners was not cleared when
|
||||
cycles loop through a router/listener instead of a @start method, causing
|
||||
or_() listeners to permanently suppress after the first iteration.
|
||||
|
||||
Pattern: router classifies → routes to ONE of several handlers → or_()
|
||||
merge downstream → cycle back. Only one handler fires per iteration, but
|
||||
the or_() merge must still fire every time.
|
||||
"""
|
||||
execution_order = []
|
||||
|
||||
class CyclicOrFlow(Flow):
|
||||
iteration = 0
|
||||
max_iterations = 3
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
execution_order.append("begin")
|
||||
|
||||
@router(or_(begin, "loop_back"))
|
||||
def route(self):
|
||||
self.iteration += 1
|
||||
execution_order.append(f"route_{self.iteration}")
|
||||
if self.iteration <= self.max_iterations:
|
||||
# Alternate between handlers on each iteration
|
||||
return "type_a" if self.iteration % 2 == 1 else "type_b"
|
||||
return "done"
|
||||
|
||||
@listen("type_a")
|
||||
def handler_a(self):
|
||||
execution_order.append(f"handler_a_{self.iteration}")
|
||||
|
||||
@listen("type_b")
|
||||
def handler_b(self):
|
||||
execution_order.append(f"handler_b_{self.iteration}")
|
||||
|
||||
# This or_() listener must fire on EVERY iteration, not just the first
|
||||
@listen(or_(handler_a, handler_b))
|
||||
def merge(self):
|
||||
execution_order.append(f"merge_{self.iteration}")
|
||||
|
||||
@listen(merge)
|
||||
def loop_back(self):
|
||||
execution_order.append(f"loop_back_{self.iteration}")
|
||||
|
||||
flow = CyclicOrFlow()
|
||||
flow.kickoff()
|
||||
|
||||
# merge must have fired once per iteration (3 times total)
|
||||
merge_events = [e for e in execution_order if e.startswith("merge_")]
|
||||
assert len(merge_events) == 3, (
|
||||
f"or_() listener 'merge' should fire every iteration, "
|
||||
f"got {len(merge_events)} fires: {execution_order}"
|
||||
)
|
||||
|
||||
# loop_back must have also fired every iteration
|
||||
loop_back_events = [e for e in execution_order if e.startswith("loop_back_")]
|
||||
assert len(loop_back_events) == 3, (
|
||||
f"'loop_back' should fire every iteration, "
|
||||
f"got {len(loop_back_events)} fires: {execution_order}"
|
||||
)
|
||||
|
||||
# Verify alternating handlers
|
||||
handler_a_events = [e for e in execution_order if e.startswith("handler_a_")]
|
||||
handler_b_events = [e for e in execution_order if e.startswith("handler_b_")]
|
||||
assert len(handler_a_events) == 2 # iterations 1 and 3
|
||||
assert len(handler_b_events) == 1 # iteration 2
|
||||
|
||||
|
||||
def test_cyclic_flow_multiple_or_listeners_fire_every_iteration():
|
||||
"""Test that multiple or_() listeners all reset between cycle iterations.
|
||||
|
||||
Mirrors a real-world pattern: a router classifies messages, handlers process
|
||||
them, then both a 'send' step (or_ on handlers) and a 'store' step (or_ on
|
||||
router outputs) must fire on every loop iteration.
|
||||
"""
|
||||
execution_order = []
|
||||
|
||||
class MultiOrCyclicFlow(Flow):
|
||||
iteration = 0
|
||||
max_iterations = 3
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
execution_order.append("begin")
|
||||
|
||||
@router(or_(begin, "capture"))
|
||||
def classify(self):
|
||||
self.iteration += 1
|
||||
execution_order.append(f"classify_{self.iteration}")
|
||||
if self.iteration <= self.max_iterations:
|
||||
return "type_a"
|
||||
return "exit"
|
||||
|
||||
@listen("type_a")
|
||||
def handle_type_a(self):
|
||||
execution_order.append(f"handle_a_{self.iteration}")
|
||||
|
||||
# or_() listener on router output strings — must fire every iteration
|
||||
@listen(or_("type_a", "type_b", "type_c"))
|
||||
def store(self):
|
||||
execution_order.append(f"store_{self.iteration}")
|
||||
|
||||
# or_() listener on handler methods — must fire every iteration
|
||||
@listen(or_(handle_type_a,))
|
||||
def send(self):
|
||||
execution_order.append(f"send_{self.iteration}")
|
||||
|
||||
@listen("send")
|
||||
def capture(self):
|
||||
execution_order.append(f"capture_{self.iteration}")
|
||||
|
||||
flow = MultiOrCyclicFlow()
|
||||
flow.kickoff()
|
||||
|
||||
for method in ["store", "send", "capture"]:
|
||||
events = [e for e in execution_order if e.startswith(f"{method}_")]
|
||||
assert len(events) == 3, (
|
||||
f"'{method}' should fire every iteration, "
|
||||
f"got {len(events)} fires: {execution_order}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user