diff --git a/lib/crewai/tests/utilities/test_lock_store.py b/lib/crewai/tests/utilities/test_lock_store.py index 1ddafb658..e50b45eba 100644 --- a/lib/crewai/tests/utilities/test_lock_store.py +++ b/lib/crewai/tests/utilities/test_lock_store.py @@ -2,7 +2,9 @@ from __future__ import annotations +import sys import threading +import time from unittest import mock import portalocker.exceptions @@ -12,118 +14,126 @@ import crewai.utilities.lock_store as lock_store from crewai.utilities.lock_store import lock +@pytest.fixture(autouse=True) +def no_redis_url(monkeypatch): + """Unset REDIS_URL for every test so file-based locking is used by default.""" + monkeypatch.setattr(lock_store, "_REDIS_URL", None) + + # --------------------------------------------------------------------------- # _redis_available # --------------------------------------------------------------------------- -def test_redis_available_no_url(): - with mock.patch.object(lock_store, "_REDIS_URL", None): - assert lock_store._redis_available() is False +def test_redis_not_available_without_url(): + assert lock_store._redis_available() is False -def test_redis_available_url_but_no_package(): - with mock.patch.object(lock_store, "_REDIS_URL", "redis://localhost:6379"): - with mock.patch("builtins.__import__", side_effect=ImportError): - assert lock_store._redis_available() is False +def test_redis_not_available_when_package_missing(monkeypatch): + monkeypatch.setattr(lock_store, "_REDIS_URL", "redis://localhost:6379") + # Setting a key to None in sys.modules causes ImportError on import + monkeypatch.setitem(sys.modules, "redis", None) + assert lock_store._redis_available() is False -def test_redis_available_url_and_package(): - with mock.patch.object(lock_store, "_REDIS_URL", "redis://localhost:6379"): - with mock.patch.dict("sys.modules", {"redis": mock.MagicMock()}): - assert lock_store._redis_available() is True +def test_redis_available_with_url_and_package(monkeypatch): + monkeypatch.setattr(lock_store, "_REDIS_URL", "redis://localhost:6379") + monkeypatch.setitem(sys.modules, "redis", mock.MagicMock()) + assert lock_store._redis_available() is True # --------------------------------------------------------------------------- -# file-based lock (no Redis) +# file-based lock # --------------------------------------------------------------------------- -def test_lock_acquires_and_releases(): - with mock.patch.object(lock_store, "_redis_available", return_value=False): - entered = False - with lock("test_basic"): - entered = True - assert entered +def test_lock_yields(): + with lock("basic"): + pass def test_lock_releases_on_exception(): - with mock.patch.object(lock_store, "_redis_available", return_value=False): - with pytest.raises(ValueError): - with lock("test_exception"): - raise ValueError("boom") + with pytest.raises(ValueError): + with lock("on_exception"): + raise ValueError("boom") - # lock should be released — acquiring again must succeed - with lock("test_exception"): + # would hang or raise LockException if the lock was not released + with lock("on_exception"): + pass + + +def test_lock_is_mutually_exclusive_across_threads(): + concurrent_holders = 0 + max_concurrent = 0 + counter_lock = threading.Lock() + barrier = threading.Barrier(5) + + def worker(): + nonlocal concurrent_holders, max_concurrent + barrier.wait() # all threads compete at the same time + with lock("mutex", timeout=10): + with counter_lock: + concurrent_holders += 1 + max_concurrent = max(max_concurrent, concurrent_holders) + time.sleep(0.01) # hold briefly so overlap is detectable if locking fails + with counter_lock: + concurrent_holders -= 1 + + threads = [threading.Thread(target=worker) for _ in range(5)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + assert all(not t.is_alive() for t in threads), "threads did not finish in time" + + assert max_concurrent == 1 + + +def test_lock_timeout_raises_when_held(): + acquired = threading.Event() + release = threading.Event() + + def holder(): + with lock("timeout_test", timeout=10): + acquired.set() + release.wait() + + t = threading.Thread(target=holder) + t.start() + acquired.wait() + + try: + with pytest.raises(portalocker.exceptions.LockException): + with lock("timeout_test", timeout=0.1): + pass + finally: + release.set() + t.join(timeout=10) + assert not t.is_alive(), "holder thread did not finish in time" + + +def test_different_names_are_independent(): + with lock("alpha"): + with lock("beta"): + pass # would deadlock if names mapped to the same lock + + +# --------------------------------------------------------------------------- +# Redis path +# --------------------------------------------------------------------------- + + +def test_redis_lock_used_when_available(monkeypatch): + fake_conn = mock.MagicMock() + monkeypatch.setattr(lock_store, "_REDIS_URL", "redis://localhost:6379") + monkeypatch.setattr(lock_store, "_redis_available", mock.Mock(return_value=True)) + monkeypatch.setattr(lock_store, "_redis_connection", mock.Mock(return_value=fake_conn)) + + with mock.patch("portalocker.RedisLock") as mock_redis_lock: + with lock("redis_test"): pass - -def test_lock_mutual_exclusion_across_threads(): - with mock.patch.object(lock_store, "_redis_available", return_value=False): - results: list[int] = [] - barrier = threading.Barrier(2) - - def worker(val: int) -> None: - barrier.wait() - with lock("test_mutex", timeout=5): - results.append(val) - - threads = [threading.Thread(target=worker, args=(i,)) for i in range(2)] - for t in threads: - t.start() - for t in threads: - t.join() - - assert sorted(results) == [0, 1] - - -def test_lock_timeout_raises(): - with mock.patch.object(lock_store, "_redis_available", return_value=False): - acquired = threading.Event() - hold = threading.Event() - - def holder() -> None: - with lock("test_timeout", timeout=5): - acquired.set() - hold.wait() - - t = threading.Thread(target=holder) - t.start() - acquired.wait() - - try: - with pytest.raises(portalocker.exceptions.LockException): - with lock("test_timeout", timeout=0.1): - pass - finally: - hold.set() - t.join() - - -def test_lock_namespaced_separately(): - """Two different lock names must not block each other.""" - with mock.patch.object(lock_store, "_redis_available", return_value=False): - with lock("lock_a"): - with lock("lock_b"): - pass - - -# --------------------------------------------------------------------------- -# Redis path (mocked) -# --------------------------------------------------------------------------- - - -def test_lock_uses_redis_when_available(): - fake_redis_lock = mock.MagicMock() - fake_redis_lock.__enter__ = mock.Mock(return_value=None) - fake_redis_lock.__exit__ = mock.Mock(return_value=False) - - with mock.patch.object(lock_store, "_redis_available", return_value=True): - with mock.patch.object(lock_store, "_redis_connection", return_value=mock.MagicMock()): - with mock.patch("portalocker.RedisLock", return_value=fake_redis_lock) as mock_rl: - with lock("test_redis"): - pass - - mock_rl.assert_called_once() - _, kwargs = mock_rl.call_args - assert kwargs["channel"].startswith("crewai:") + mock_redis_lock.assert_called_once() + kwargs = mock_redis_lock.call_args.kwargs + assert kwargs["channel"].startswith("crewai:") + assert kwargs["connection"] is fake_conn