From f3a15a4f07d7d235b9f46eb4536af2e827a9f8cf Mon Sep 17 00:00:00 2001 From: Matt Aitchison Date: Thu, 4 Jun 2026 13:28:31 -0500 Subject: [PATCH] feat(lock_store): make locking backend overridable (#6015) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(lock_store): make locking backend overridable Allow the centralised lock factory to use a pluggable backend instead of the hardcoded Redis/file selection. Backends are resolved with precedence override > CREWAI_LOCK_FACTORY env > built-in default: - set_lock_backend()/reset_lock_backend() and a scoped lock_backend() context manager for programmatic overrides - CREWAI_LOCK_FACTORY="module:callable" env import-path, resolved lazily and cached, with clear errors on malformed or non-callable specs - LockBackend Protocol documenting the contract (raw name in, context manager out; backend owns its namespacing) Default Redis/file behavior is unchanged when nothing is overridden. * refactor(lock_store): use explicit body for LockBackend protocol method Replace the no-op `...` body with `raise NotImplementedError` to satisfy the CodeQL ineffectual-statement check while keeping the Protocol structural-typing only. * refactor(lock_store): drop scoped lock_backend context manager Keep the backend overridable via set_lock_backend/reset_lock_backend and the CREWAI_LOCK_FACTORY env path, but remove the scoped lock_backend() context manager. It was speculative surface and the only thread-unsafe piece (racy save/restore of the module global); nothing depends on it. * refactor(lock_store): drop reset_lock_backend alias reset_lock_backend() was just set_lock_backend(None); callers use that directly. Clearing the override is documented on set_lock_backend. * style(lock_store): apply ruff format * refactor(lock_store): simplify overridable backend to a single setter Reduce the override surface to just set_lock_backend(): lock() uses the custom backend when one is set, otherwise the unchanged Redis/file default. Drop the CREWAI_LOCK_FACTORY env import-path, the runtime_checkable Protocol, the precedence resolver, and the getter — a custom backend is now any callable(name, *, timeout) -> context manager, registered in process. * fix(lock_store): snapshot backend to avoid check-then-call race Read the module-global backend once into a local before the None check and the call, so a concurrent set_lock_backend(None) cannot make lock() invoke None. * docs(lock_store): clarify name handling for custom backends The default namespaces the lock name; custom backends receive it verbatim. Correct the lock() docstring which implied namespacing always happens. * docs(lock_store): note set_lock_backend is for one-time startup setup --- lib/crewai-core/src/crewai_core/lock_store.py | 46 ++++++++++++++--- lib/crewai/tests/utilities/test_lock_store.py | 51 ++++++++++++++++++- 2 files changed, 88 insertions(+), 9 deletions(-) diff --git a/lib/crewai-core/src/crewai_core/lock_store.py b/lib/crewai-core/src/crewai_core/lock_store.py index 0f09fa7f6..be1d08faa 100644 --- a/lib/crewai-core/src/crewai_core/lock_store.py +++ b/lib/crewai-core/src/crewai_core/lock_store.py @@ -1,14 +1,18 @@ """Centralised lock factory. -If ``REDIS_URL`` is set and the ``redis`` package is installed, locks are -distributed via ``portalocker.RedisLock``. Otherwise, falls back to the -standard file-based ``portalocker.Lock`` in the system temp dir. +By default, if ``REDIS_URL`` is set and the ``redis`` package is installed, +locks are distributed via ``portalocker.RedisLock``. Otherwise, falls back to +the standard file-based ``portalocker.Lock`` in the system temp dir. + +The backend can be replaced via :func:`set_lock_backend` to plug in a custom +locking strategy (e.g. a different distributed lock service, or an in-process +lock for tests). """ from __future__ import annotations -from collections.abc import Iterator -from contextlib import contextmanager +from collections.abc import Callable, Iterator +from contextlib import AbstractContextManager, contextmanager from functools import lru_cache from hashlib import md5 import logging @@ -30,6 +34,25 @@ _REDIS_URL: str | None = os.environ.get("REDIS_URL") _DEFAULT_TIMEOUT: Final[int] = 120 +# A backend is called as ``backend(name, timeout=...)`` and returns a context +# manager that holds the lock while the ``with`` block runs. +LockBackend = Callable[..., AbstractContextManager[None]] + +# ``None`` means use the built-in Redis/file selection. +_backend: LockBackend | None = None + + +def set_lock_backend(backend: LockBackend | None) -> None: + """Replace the process-wide locking backend used by :func:`lock`. + + Intended for one-time setup at startup. Pass ``None`` to restore the + built-in Redis/file default. In-flight :func:`lock` calls keep the backend + they started with, but swapping backends while other threads acquire locks + is otherwise unsynchronised. + """ + global _backend + _backend = backend + def _redis_available() -> bool: """Return True if redis is installed and REDIS_URL is set.""" @@ -58,10 +81,19 @@ def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: """Acquire a named lock, yielding while it is held. Args: - name: A human-readable lock name (e.g. ``"chromadb_init"``). - Automatically namespaced to avoid collisions. + name: A human-readable lock name (e.g. ``"chromadb_init"``). The + built-in default namespaces it to avoid collisions; a custom + backend receives it verbatim. timeout: Maximum seconds to wait for the lock before raising. """ + # Snapshot the global once: a concurrent set_lock_backend() must not turn + # the check-then-call into calling ``None``. + backend = _backend + if backend is not None: + with backend(name, timeout=timeout): + yield + return + channel = f"crewai:{md5(name.encode(), usedforsecurity=False).hexdigest()}" if _redis_available(): diff --git a/lib/crewai/tests/utilities/test_lock_store.py b/lib/crewai/tests/utilities/test_lock_store.py index 1baa0169a..baad049d8 100644 --- a/lib/crewai/tests/utilities/test_lock_store.py +++ b/lib/crewai/tests/utilities/test_lock_store.py @@ -1,11 +1,13 @@ """Tests for lock_store. -We verify our own logic: the _redis_available guard and which portalocker -backend is selected. We trust portalocker to handle actual locking mechanics. +We verify our own logic: the _redis_available guard, which portalocker +backend is selected, and that a custom backend can be plugged in. We trust +portalocker to handle actual locking mechanics. """ from __future__ import annotations +from contextlib import contextmanager import sys from unittest import mock @@ -20,6 +22,14 @@ def no_redis_url(monkeypatch): monkeypatch.setattr(lock_store, "_REDIS_URL", None) +@pytest.fixture(autouse=True) +def reset_backend(): + """Ensure a custom backend never leaks across tests.""" + lock_store.set_lock_backend(None) + yield + lock_store.set_lock_backend(None) + + # _redis_available @@ -64,3 +74,40 @@ def test_uses_redis_lock_when_redis_available(monkeypatch): kwargs = mock_redis_lock.call_args.kwargs assert kwargs["channel"].startswith("crewai:") assert kwargs["connection"] is fake_conn + + +# custom backend + + +def test_custom_backend_is_used(): + calls = [] + + @contextmanager + def fake_backend(name, *, timeout): + calls.append((name, timeout)) + yield + + lock_store.set_lock_backend(fake_backend) + + # The default file/redis path must not be touched when overridden. + with mock.patch("portalocker.Lock") as mock_lock: + with lock("custom_test", timeout=5): + pass + + mock_lock.assert_not_called() + assert calls == [("custom_test", 5)] + + +def test_clearing_backend_restores_default(): + @contextmanager + def fake_backend(name, *, timeout): + yield + + lock_store.set_lock_backend(fake_backend) + lock_store.set_lock_backend(None) + + with mock.patch("portalocker.Lock") as mock_lock: + with lock("after_clear"): + pass + + mock_lock.assert_called_once()