mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-04 14:39:23 +00:00
fix(events): gate RuntimeState recording to stop unbounded event-bus leak
The process-global event bus recorded every emitted event into a RuntimeState (entity `root` list + `event_record`) on every kickoff, unconditionally and with no eviction. Only the checkpoint/replay machinery ever reads that recorder, so for the common "construct a Flow/Crew, kickoff, discard" pattern it grew ~linearly with kickoff count until a long-lived process (worker, request handler, scheduler) was OOM-killed. Gate recording behind an armed flag: the bus only registers entities and records events once recording is enabled, which happens when a CheckpointConfig is resolved on a Crew/Flow/Agent or when a state is restored via set_runtime_state(). Plain and @persist kickoff loops now record nothing; checkpoint/replay behavior is unchanged. Also expose a public reset_runtime_state() so embedders that checkpoint but never replay in-process can bound memory between runs.
This commit is contained in:
@@ -117,6 +117,7 @@ class CrewAIEventsBus:
|
||||
_executor_initialized: bool
|
||||
_has_pending_events: bool
|
||||
_runtime_state: RuntimeState | None
|
||||
_recording_enabled: bool
|
||||
|
||||
def __new__(cls) -> Self:
|
||||
"""Create or return the singleton instance.
|
||||
@@ -153,6 +154,14 @@ class CrewAIEventsBus:
|
||||
self._has_pending_events = False
|
||||
self._runtime_state: RuntimeState | None = None
|
||||
self._registered_entity_ids: set[int] = set()
|
||||
# The RuntimeState recorder (entity root + event_record) exists solely
|
||||
# to serialize a run for checkpoint/replay. It is only ever read back
|
||||
# by the checkpoint/fork/resume machinery, so we leave it disabled
|
||||
# until something actually needs it. Recording it unconditionally on a
|
||||
# process-global singleton is an unbounded leak in long-lived processes
|
||||
# that run many kickoffs. Armed by ``enable_recording()`` (called when
|
||||
# a checkpoint config is resolved) and by ``set_runtime_state()``.
|
||||
self._recording_enabled: bool = False
|
||||
|
||||
def _ensure_executor_initialized(self) -> None:
|
||||
"""Lazily initialize the thread pool executor and event loop.
|
||||
@@ -270,11 +279,46 @@ class CrewAIEventsBus:
|
||||
|
||||
return decorator
|
||||
|
||||
def enable_recording(self) -> None:
|
||||
"""Arm RuntimeState recording for this process.
|
||||
|
||||
Until armed, the bus does not register entities or record events into a
|
||||
``RuntimeState`` — that recorder feeds checkpoint/replay only and is
|
||||
wasted work (and an unbounded leak on a long-lived singleton) when no
|
||||
checkpointing is configured. Called when a ``CheckpointConfig`` is
|
||||
resolved on a Crew/Flow/Agent (see
|
||||
``crewai.state.checkpoint_listener``) and by :meth:`set_runtime_state`.
|
||||
Idempotent.
|
||||
"""
|
||||
with self._instance_lock:
|
||||
self._recording_enabled = True
|
||||
|
||||
def set_runtime_state(self, state: RuntimeState) -> None:
|
||||
"""Set the RuntimeState that will be passed to event handlers."""
|
||||
with self._instance_lock:
|
||||
self._runtime_state = state
|
||||
self._registered_entity_ids = {id(e) for e in state.root}
|
||||
self._recording_enabled = True
|
||||
|
||||
def reset_runtime_state(self) -> None:
|
||||
"""Drop the recorded ``RuntimeState`` and registered entity ids.
|
||||
|
||||
When recording is armed (checkpointing configured, or a state was
|
||||
restored via :meth:`set_runtime_state`), the bus records every emitted
|
||||
event into a process-global ``RuntimeState`` — the entity ``root`` list
|
||||
plus the ``event_record`` — so checkpoint/replay can reconstruct a run.
|
||||
Because the bus is a process-global singleton, that record grows
|
||||
without bound across successive ``kickoff`` calls in a long-lived
|
||||
process (worker, request handler, scheduler). Embedders that checkpoint
|
||||
but never replay in-process can call this between runs to bound memory.
|
||||
|
||||
This clears the recorded data but leaves recording armed, so subsequent
|
||||
runs still record. Safe to call when no state is attached. Do not call
|
||||
mid-run or while a pending checkpoint/replay depends on the record.
|
||||
"""
|
||||
with self._instance_lock:
|
||||
self._runtime_state = None
|
||||
self._registered_entity_ids = set()
|
||||
|
||||
@property
|
||||
def runtime_state(self) -> RuntimeState | None:
|
||||
@@ -464,17 +508,31 @@ class CrewAIEventsBus:
|
||||
await self._acall_handlers(source, event, level_async)
|
||||
|
||||
def _register_source(self, source: Any) -> None:
|
||||
"""Register the source entity in RuntimeState if applicable."""
|
||||
"""Register the source entity in RuntimeState if applicable.
|
||||
|
||||
No-op unless recording is armed (see :meth:`enable_recording`): the
|
||||
RuntimeState entity list is only read by checkpoint/replay.
|
||||
"""
|
||||
if (
|
||||
getattr(source, "entity_type", None) in ("flow", "crew", "agent")
|
||||
self._recording_enabled
|
||||
and getattr(source, "entity_type", None) in ("flow", "crew", "agent")
|
||||
and id(source) not in self._registered_entity_ids
|
||||
):
|
||||
self.register_entity(source)
|
||||
|
||||
def _record_event(self, event: BaseEvent) -> None:
|
||||
"""Add an event to the RuntimeState event record."""
|
||||
if self._runtime_state is not None:
|
||||
self._runtime_state.event_record.add(event)
|
||||
"""Add an event to the RuntimeState event record.
|
||||
|
||||
No-op unless recording is armed (see :meth:`enable_recording`): the
|
||||
event record is only read by checkpoint/replay.
|
||||
"""
|
||||
if not self._recording_enabled:
|
||||
return
|
||||
# Read once: a concurrent reset_runtime_state() can null _runtime_state
|
||||
# between the check and the deref.
|
||||
state = self._runtime_state
|
||||
if state is not None:
|
||||
state.event_record.add(event)
|
||||
|
||||
def _prepare_event(self, source: Any, event: BaseEvent) -> None:
|
||||
"""Register source, set scope/sequence metadata, and record the event.
|
||||
|
||||
@@ -49,6 +49,11 @@ def _ensure_handlers_registered() -> None:
|
||||
if _handlers_registered:
|
||||
return
|
||||
_register_all_handlers(crewai_event_bus)
|
||||
# Arm RuntimeState recording: from here on the bus must capture the
|
||||
# entity tree + event record so a checkpoint can serialize the run.
|
||||
# Until a checkpoint config is resolved, recording stays off to avoid
|
||||
# an unbounded leak on the process-global bus.
|
||||
crewai_event_bus.enable_recording()
|
||||
_handlers_registered = True
|
||||
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import contextmanager
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
@@ -132,6 +133,12 @@ class TestFlowResumeReplaysEvents:
|
||||
def step_c(self) -> str:
|
||||
return "c"
|
||||
|
||||
# The event record (which replay reads from) is only populated when
|
||||
# recording is armed. In production a resume arms it by restoring the
|
||||
# RuntimeState from a checkpoint (``from_checkpoint`` ->
|
||||
# ``set_runtime_state``); here we arm it directly and let flow1 record
|
||||
# live, then resume flow2 against that record.
|
||||
crewai_event_bus.enable_recording()
|
||||
if crewai_event_bus.runtime_state is not None:
|
||||
crewai_event_bus.runtime_state.event_record.clear()
|
||||
|
||||
@@ -163,3 +170,75 @@ class TestFlowResumeReplaysEvents:
|
||||
assert captured_finished.count("step_a") == 1
|
||||
assert captured_finished.count("step_b") == 1
|
||||
assert captured_finished.count("step_c") == 1
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _isolated_recording_state() -> Any:
|
||||
"""Snapshot and fully restore the singleton bus recording state.
|
||||
|
||||
Restores all three coupled fields (``_recording_enabled``,
|
||||
``_runtime_state``, ``_registered_entity_ids``) so a test that pokes the
|
||||
process-global bus can't leak an inconsistent state — e.g. a populated
|
||||
``_runtime_state`` with an emptied id set — into later tests.
|
||||
"""
|
||||
prev_enabled = crewai_event_bus._recording_enabled
|
||||
prev_state = crewai_event_bus._runtime_state
|
||||
prev_ids = crewai_event_bus._registered_entity_ids
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
crewai_event_bus._recording_enabled = prev_enabled
|
||||
crewai_event_bus._runtime_state = prev_state
|
||||
crewai_event_bus._registered_entity_ids = prev_ids
|
||||
|
||||
|
||||
class TestRecordingGate:
|
||||
"""RuntimeState recording is armed only when checkpoint/replay needs it.
|
||||
|
||||
The bus is a process-global singleton; recording every event into its
|
||||
``RuntimeState`` unconditionally leaks linearly across kickoffs in a
|
||||
long-lived process. Recording stays off until a checkpoint config is
|
||||
resolved (or a state is restored), so the common "construct, kickoff,
|
||||
discard" loop allocates nothing.
|
||||
"""
|
||||
|
||||
def test_plain_flow_does_not_record_when_recording_disarmed(self) -> None:
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
class EchoFlow(Flow):
|
||||
@start()
|
||||
def begin(self) -> str:
|
||||
return "begin"
|
||||
|
||||
@listen(begin)
|
||||
def finish(self, _: Any) -> str:
|
||||
return "finish"
|
||||
|
||||
with _isolated_recording_state():
|
||||
# Force a clean, never-checkpointed process state. (Another test in
|
||||
# the same process may have armed recording via a checkpoint config.)
|
||||
crewai_event_bus.reset_runtime_state()
|
||||
crewai_event_bus._recording_enabled = False
|
||||
|
||||
for _ in range(5):
|
||||
EchoFlow().kickoff(inputs={"payload": "x" * 1000})
|
||||
# No checkpointing configured -> nothing recorded -> no leak.
|
||||
assert crewai_event_bus.runtime_state is None
|
||||
|
||||
def test_armed_flow_records_into_runtime_state(self) -> None:
|
||||
from crewai.flow.flow import Flow, start
|
||||
|
||||
class OneStep(Flow):
|
||||
@start()
|
||||
def begin(self) -> str:
|
||||
return "begin"
|
||||
|
||||
with _isolated_recording_state():
|
||||
crewai_event_bus.reset_runtime_state()
|
||||
crewai_event_bus.enable_recording()
|
||||
|
||||
OneStep().kickoff()
|
||||
state = crewai_event_bus.runtime_state
|
||||
assert state is not None
|
||||
assert len(state.root) == 1
|
||||
assert len(state.event_record.nodes) > 0
|
||||
|
||||
Reference in New Issue
Block a user