Compare commits

...

1 Commits

Author SHA1 Message Date
Matt Aitchison
654e738786 fix(events): gate RuntimeState recording to stop unbounded event-bus leak
The process-global event bus recorded every emitted event into a
RuntimeState (entity `root` list + `event_record`) on every kickoff,
unconditionally and with no eviction. Only the checkpoint/replay
machinery ever reads that recorder, so for the common "construct a
Flow/Crew, kickoff, discard" pattern it grew ~linearly with kickoff
count until a long-lived process (worker, request handler, scheduler)
was OOM-killed.

Gate recording behind an armed flag: the bus only registers entities and
records events once recording is enabled, which happens when a
CheckpointConfig is resolved on a Crew/Flow/Agent or when a state is
restored via set_runtime_state(). Plain and @persist kickoff loops now
record nothing; checkpoint/replay behavior is unchanged. Also expose a
public reset_runtime_state() so embedders that checkpoint but never
replay in-process can bound memory between runs.
2026-06-05 17:34:23 -05:00
3 changed files with 147 additions and 5 deletions

View File

@@ -117,6 +117,7 @@ class CrewAIEventsBus:
_executor_initialized: bool
_has_pending_events: bool
_runtime_state: RuntimeState | None
_recording_enabled: bool
def __new__(cls) -> Self:
"""Create or return the singleton instance.
@@ -153,6 +154,14 @@ class CrewAIEventsBus:
self._has_pending_events = False
self._runtime_state: RuntimeState | None = None
self._registered_entity_ids: set[int] = set()
# The RuntimeState recorder (entity root + event_record) exists solely
# to serialize a run for checkpoint/replay. It is only ever read back
# by the checkpoint/fork/resume machinery, so we leave it disabled
# until something actually needs it. Recording it unconditionally on a
# process-global singleton is an unbounded leak in long-lived processes
# that run many kickoffs. Armed by ``enable_recording()`` (called when
# a checkpoint config is resolved) and by ``set_runtime_state()``.
self._recording_enabled: bool = False
def _ensure_executor_initialized(self) -> None:
"""Lazily initialize the thread pool executor and event loop.
@@ -270,11 +279,46 @@ class CrewAIEventsBus:
return decorator
def enable_recording(self) -> None:
"""Arm RuntimeState recording for this process.
Until armed, the bus does not register entities or record events into a
``RuntimeState`` — that recorder feeds checkpoint/replay only and is
wasted work (and an unbounded leak on a long-lived singleton) when no
checkpointing is configured. Called when a ``CheckpointConfig`` is
resolved on a Crew/Flow/Agent (see
``crewai.state.checkpoint_listener``) and by :meth:`set_runtime_state`.
Idempotent.
"""
with self._instance_lock:
self._recording_enabled = True
def set_runtime_state(self, state: RuntimeState) -> None:
"""Set the RuntimeState that will be passed to event handlers."""
with self._instance_lock:
self._runtime_state = state
self._registered_entity_ids = {id(e) for e in state.root}
self._recording_enabled = True
def reset_runtime_state(self) -> None:
"""Drop the recorded ``RuntimeState`` and registered entity ids.
When recording is armed (checkpointing configured, or a state was
restored via :meth:`set_runtime_state`), the bus records every emitted
event into a process-global ``RuntimeState`` — the entity ``root`` list
plus the ``event_record`` — so checkpoint/replay can reconstruct a run.
Because the bus is a process-global singleton, that record grows
without bound across successive ``kickoff`` calls in a long-lived
process (worker, request handler, scheduler). Embedders that checkpoint
but never replay in-process can call this between runs to bound memory.
This clears the recorded data but leaves recording armed, so subsequent
runs still record. Safe to call when no state is attached. Do not call
mid-run or while a pending checkpoint/replay depends on the record.
"""
with self._instance_lock:
self._runtime_state = None
self._registered_entity_ids = set()
@property
def runtime_state(self) -> RuntimeState | None:
@@ -464,17 +508,31 @@ class CrewAIEventsBus:
await self._acall_handlers(source, event, level_async)
def _register_source(self, source: Any) -> None:
"""Register the source entity in RuntimeState if applicable."""
"""Register the source entity in RuntimeState if applicable.
No-op unless recording is armed (see :meth:`enable_recording`): the
RuntimeState entity list is only read by checkpoint/replay.
"""
if (
getattr(source, "entity_type", None) in ("flow", "crew", "agent")
self._recording_enabled
and getattr(source, "entity_type", None) in ("flow", "crew", "agent")
and id(source) not in self._registered_entity_ids
):
self.register_entity(source)
def _record_event(self, event: BaseEvent) -> None:
"""Add an event to the RuntimeState event record."""
if self._runtime_state is not None:
self._runtime_state.event_record.add(event)
"""Add an event to the RuntimeState event record.
No-op unless recording is armed (see :meth:`enable_recording`): the
event record is only read by checkpoint/replay.
"""
if not self._recording_enabled:
return
# Read once: a concurrent reset_runtime_state() can null _runtime_state
# between the check and the deref.
state = self._runtime_state
if state is not None:
state.event_record.add(event)
def _prepare_event(self, source: Any, event: BaseEvent) -> None:
"""Register source, set scope/sequence metadata, and record the event.

View File

@@ -49,6 +49,11 @@ def _ensure_handlers_registered() -> None:
if _handlers_registered:
return
_register_all_handlers(crewai_event_bus)
# Arm RuntimeState recording: from here on the bus must capture the
# entity tree + event record so a checkpoint can serialize the run.
# Until a checkpoint config is resolved, recording stays off to avoid
# an unbounded leak on the process-global bus.
crewai_event_bus.enable_recording()
_handlers_registered = True

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
from contextlib import contextmanager
from typing import Any
from unittest.mock import patch
@@ -132,6 +133,12 @@ class TestFlowResumeReplaysEvents:
def step_c(self) -> str:
return "c"
# The event record (which replay reads from) is only populated when
# recording is armed. In production a resume arms it by restoring the
# RuntimeState from a checkpoint (``from_checkpoint`` ->
# ``set_runtime_state``); here we arm it directly and let flow1 record
# live, then resume flow2 against that record.
crewai_event_bus.enable_recording()
if crewai_event_bus.runtime_state is not None:
crewai_event_bus.runtime_state.event_record.clear()
@@ -163,3 +170,75 @@ class TestFlowResumeReplaysEvents:
assert captured_finished.count("step_a") == 1
assert captured_finished.count("step_b") == 1
assert captured_finished.count("step_c") == 1
@contextmanager
def _isolated_recording_state() -> Any:
"""Snapshot and fully restore the singleton bus recording state.
Restores all three coupled fields (``_recording_enabled``,
``_runtime_state``, ``_registered_entity_ids``) so a test that pokes the
process-global bus can't leak an inconsistent state — e.g. a populated
``_runtime_state`` with an emptied id set — into later tests.
"""
prev_enabled = crewai_event_bus._recording_enabled
prev_state = crewai_event_bus._runtime_state
prev_ids = crewai_event_bus._registered_entity_ids
try:
yield
finally:
crewai_event_bus._recording_enabled = prev_enabled
crewai_event_bus._runtime_state = prev_state
crewai_event_bus._registered_entity_ids = prev_ids
class TestRecordingGate:
"""RuntimeState recording is armed only when checkpoint/replay needs it.
The bus is a process-global singleton; recording every event into its
``RuntimeState`` unconditionally leaks linearly across kickoffs in a
long-lived process. Recording stays off until a checkpoint config is
resolved (or a state is restored), so the common "construct, kickoff,
discard" loop allocates nothing.
"""
def test_plain_flow_does_not_record_when_recording_disarmed(self) -> None:
from crewai.flow.flow import Flow, listen, start
class EchoFlow(Flow):
@start()
def begin(self) -> str:
return "begin"
@listen(begin)
def finish(self, _: Any) -> str:
return "finish"
with _isolated_recording_state():
# Force a clean, never-checkpointed process state. (Another test in
# the same process may have armed recording via a checkpoint config.)
crewai_event_bus.reset_runtime_state()
crewai_event_bus._recording_enabled = False
for _ in range(5):
EchoFlow().kickoff(inputs={"payload": "x" * 1000})
# No checkpointing configured -> nothing recorded -> no leak.
assert crewai_event_bus.runtime_state is None
def test_armed_flow_records_into_runtime_state(self) -> None:
from crewai.flow.flow import Flow, start
class OneStep(Flow):
@start()
def begin(self) -> str:
return "begin"
with _isolated_recording_state():
crewai_event_bus.reset_runtime_state()
crewai_event_bus.enable_recording()
OneStep().kickoff()
state = crewai_event_bus.runtime_state
assert state is not None
assert len(state.root) == 1
assert len(state.event_record.nodes) > 0