diff --git a/lib/crewai-tools/src/crewai_tools/adapters/lancedb_adapter.py b/lib/crewai-tools/src/crewai_tools/adapters/lancedb_adapter.py index 0e92ac85a..af5d3a786 100644 --- a/lib/crewai-tools/src/crewai_tools/adapters/lancedb_adapter.py +++ b/lib/crewai-tools/src/crewai_tools/adapters/lancedb_adapter.py @@ -46,13 +46,12 @@ class LanceDBAdapter(Adapter): def query(self, question: str) -> str: # type: ignore[override] query = self.embedding_function([question])[0] - with store_lock(self._lock_name): - results = ( - self._table.search(query, vector_column_name=self.vector_column_name) - .limit(self.top_k) - .select([self.text_column_name]) - .to_list() - ) + results = ( + self._table.search(query, vector_column_name=self.vector_column_name) + .limit(self.top_k) + .select([self.text_column_name]) + .to_list() + ) values = [result[self.text_column_name] for result in results] return "\n".join(values) diff --git a/lib/crewai-tools/src/crewai_tools/rag/core.py b/lib/crewai-tools/src/crewai_tools/rag/core.py index d8bc51e15..b418cc92f 100644 --- a/lib/crewai-tools/src/crewai_tools/rag/core.py +++ b/lib/crewai-tools/src/crewai_tools/rag/core.py @@ -173,13 +173,12 @@ class RAG(Adapter): try: question_embedding = self._embedding_service.embed_text(question) - with store_lock(self._lock_name): - results = self._collection.query( - query_embeddings=[question_embedding], - n_results=self.top_k, - where=where, - include=["documents", "metadatas", "distances"], - ) + results = self._collection.query( + query_embeddings=[question_embedding], + n_results=self.top_k, + where=where, + include=["documents", "metadatas", "distances"], + ) if ( not results diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index bd24d610e..674f551eb 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -2716,7 +2716,9 @@ class Flow(Generic[T], metaclass=FlowMeta): from crewai.flow.async_feedback.types import HumanFeedbackPending if not isinstance(e, HumanFeedbackPending): - logger.error(f"Error executing listener {listener_name}: {e}") + if not getattr(e, "_flow_listener_logged", False): + logger.error(f"Error executing listener {listener_name}: {e}") + e._flow_listener_logged = True # type: ignore[attr-defined] raise # ── User Input (self.ask) ──────────────────────────────────────── diff --git a/lib/crewai/src/crewai/memory/encoding_flow.py b/lib/crewai/src/crewai/memory/encoding_flow.py index 6387c45e6..cd1babb2d 100644 --- a/lib/crewai/src/crewai/memory/encoding_flow.py +++ b/lib/crewai/src/crewai/memory/encoding_flow.py @@ -13,6 +13,7 @@ from __future__ import annotations from concurrent.futures import Future, ThreadPoolExecutor import contextvars from datetime import datetime +import logging import math from typing import Any from uuid import uuid4 @@ -29,6 +30,8 @@ from crewai.memory.analyze import ( from crewai.memory.types import MemoryConfig, MemoryRecord, embed_texts +logger = logging.getLogger(__name__) + # --------------------------------------------------------------------------- # State models # --------------------------------------------------------------------------- @@ -188,7 +191,15 @@ class EncodingFlow(Flow[EncodingState]): if len(active) == 1: _, item = active[0] - raw = _search_one(item) + try: + raw = _search_one(item) + except Exception: + logger.warning( + "Storage search failed in parallel_find_similar, " + "treating item as new", + exc_info=True, + ) + raw = [] item.similar_records = [r for r, _ in raw] item.top_similarity = float(raw[0][1]) if raw else 0.0 else: @@ -202,7 +213,15 @@ class EncodingFlow(Flow[EncodingState]): for i, item in active ] for _, item, future in futures: - raw = future.result() + try: + raw = future.result() + except Exception: + logger.warning( + "Storage search failed in parallel_find_similar, " + "treating item as new", + exc_info=True, + ) + raw = [] item.similar_records = [r for r, _ in raw] item.top_similarity = float(raw[0][1]) if raw else 0.0 diff --git a/lib/crewai/src/crewai/memory/recall_flow.py b/lib/crewai/src/crewai/memory/recall_flow.py index e257d7f2c..f056c9a1d 100644 --- a/lib/crewai/src/crewai/memory/recall_flow.py +++ b/lib/crewai/src/crewai/memory/recall_flow.py @@ -13,6 +13,7 @@ from __future__ import annotations from concurrent.futures import ThreadPoolExecutor, as_completed import contextvars from datetime import datetime +import logging from typing import Any from uuid import uuid4 @@ -30,6 +31,9 @@ from crewai.memory.types import ( ) +logger = logging.getLogger(__name__) + + class RecallState(BaseModel): """State for the recall flow.""" @@ -125,7 +129,14 @@ class RecallFlow(Flow[RecallState]): if len(tasks) <= 1: for emb, sc in tasks: - scope, results = _search_one(emb, sc) + try: + scope, results = _search_one(emb, sc) + except Exception: + logger.warning( + "Storage search failed in recall flow, skipping scope", + exc_info=True, + ) + continue if results: top_composite, _ = compute_composite_score( results[0][0], results[0][1], self._config @@ -147,7 +158,14 @@ class RecallFlow(Flow[RecallState]): for emb, sc in tasks } for future in as_completed(futures): - scope, results = future.result() + try: + scope, results = future.result() + except Exception: + logger.warning( + "Storage search failed in recall flow, skipping scope", + exc_info=True, + ) + continue if results: top_composite, _ = compute_composite_score( results[0][0], results[0][1], self._config @@ -246,13 +264,17 @@ class RecallFlow(Flow[RecallState]): if analysis and analysis.suggested_scopes: candidates = [s for s in analysis.suggested_scopes if s] else: - candidates = self._storage.list_scopes(scope_prefix) + try: + candidates = self._storage.list_scopes(scope_prefix) + except Exception: + logger.warning( + "Storage list_scopes failed in filter_and_chunk, " + "falling back to scope prefix", + exc_info=True, + ) + candidates = [] if not candidates: - info = self._storage.get_scope_info(scope_prefix) - if info.record_count > 0: - candidates = [scope_prefix] - else: - candidates = [scope_prefix] + candidates = [scope_prefix] self.state.candidate_scopes = candidates[:20] return self.state.candidate_scopes diff --git a/lib/crewai/src/crewai/memory/storage/lancedb_storage.py b/lib/crewai/src/crewai/memory/storage/lancedb_storage.py index 014ac32fd..a7a2d3956 100644 --- a/lib/crewai/src/crewai/memory/storage/lancedb_storage.py +++ b/lib/crewai/src/crewai/memory/storage/lancedb_storage.py @@ -350,12 +350,11 @@ class LanceDBStorage: """Return a single record by ID, or None if not found.""" if self._table is None: return None - with store_lock(self._lock_name): - safe_id = str(record_id).replace("'", "''") - rows = self._table.search().where(f"id = '{safe_id}'").limit(1).to_list() - if not rows: - return None - return self._row_to_record(rows[0]) + safe_id = str(record_id).replace("'", "''") + rows = self._table.search().where(f"id = '{safe_id}'").limit(1).to_list() + if not rows: + return None + return self._row_to_record(rows[0]) def search( self, @@ -368,15 +367,14 @@ class LanceDBStorage: ) -> list[tuple[MemoryRecord, float]]: if self._table is None: return [] - with store_lock(self._lock_name): - query = self._table.search(query_embedding) - if scope_prefix is not None and scope_prefix.strip("/"): - prefix = scope_prefix.rstrip("/") - like_val = prefix + "%" - query = query.where(f"scope LIKE '{like_val}'") - results = query.limit( - limit * 3 if (categories or metadata_filter) else limit - ).to_list() + query = self._table.search(query_embedding) + if scope_prefix is not None and scope_prefix.strip("/"): + prefix = scope_prefix.rstrip("/") + like_val = prefix + "%" + query = query.where(f"scope LIKE '{like_val}'") + results = query.limit( + limit * 3 if (categories or metadata_filter) else limit + ).to_list() out: list[tuple[MemoryRecord, float]] = [] for row in results: record = self._row_to_record(row) @@ -460,8 +458,6 @@ class LanceDBStorage: Uses a full table scan (no vector query) so the limit is applied after the scope filter, not to ANN candidates before filtering. - Caller must hold ``store_lock(self._lock_name)``. - Args: scope_prefix: Optional scope path prefix to filter by. limit: Maximum number of rows to return (applied after filtering). @@ -492,8 +488,7 @@ class LanceDBStorage: Returns: List of MemoryRecord, ordered by created_at descending. """ - with store_lock(self._lock_name): - rows = self._scan_rows(scope_prefix, limit=limit + offset) + rows = self._scan_rows(scope_prefix, limit=limit + offset) records = [self._row_to_record(r) for r in rows] records.sort(key=lambda r: r.created_at, reverse=True) return records[offset : offset + limit] @@ -503,11 +498,10 @@ class LanceDBStorage: prefix = scope if scope != "/" else "" if prefix and not prefix.startswith("/"): prefix = "/" + prefix - with store_lock(self._lock_name): - rows = self._scan_rows( - prefix or None, - columns=["scope", "categories_str", "created_at"], - ) + rows = self._scan_rows( + prefix or None, + columns=["scope", "categories_str", "created_at"], + ) if not rows: return ScopeInfo( path=scope or "/", @@ -558,8 +552,7 @@ class LanceDBStorage: def list_scopes(self, parent: str = "/") -> list[str]: parent = parent.rstrip("/") or "" prefix = (parent + "/") if parent else "/" - with store_lock(self._lock_name): - rows = self._scan_rows(prefix if prefix != "/" else None, columns=["scope"]) + rows = self._scan_rows(prefix if prefix != "/" else None, columns=["scope"]) children: set[str] = set() for row in rows: sc = str(row.get("scope", "")) @@ -571,8 +564,7 @@ class LanceDBStorage: return sorted(children) def list_categories(self, scope_prefix: str | None = None) -> dict[str, int]: - with store_lock(self._lock_name): - rows = self._scan_rows(scope_prefix, columns=["categories_str"]) + rows = self._scan_rows(scope_prefix, columns=["categories_str"]) counts: dict[str, int] = {} for row in rows: cat_str = row.get("categories_str") or "[]" @@ -588,8 +580,7 @@ class LanceDBStorage: if self._table is None: return 0 if scope_prefix is None or scope_prefix.strip("/") == "": - with store_lock(self._lock_name): - return int(self._table.count_rows()) + return int(self._table.count_rows()) info = self.get_scope_info(scope_prefix) return info.record_count diff --git a/lib/crewai/src/crewai/rag/chromadb/client.py b/lib/crewai/src/crewai/rag/chromadb/client.py index b95a37385..153230b8b 100644 --- a/lib/crewai/src/crewai/rag/chromadb/client.py +++ b/lib/crewai/src/crewai/rag/chromadb/client.py @@ -446,30 +446,29 @@ class ChromaDBClient(BaseClient): params = _extract_search_params(kwargs) - with self._locked(): - collection = self.client.get_or_create_collection( - name=_sanitize_collection_name(params.collection_name), - embedding_function=self.embedding_function, + collection = self.client.get_or_create_collection( + name=_sanitize_collection_name(params.collection_name), + embedding_function=self.embedding_function, + ) + + where = params.where if params.where is not None else params.metadata_filter + + with suppress_logging( + "chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR + ): + results: QueryResult = collection.query( + query_texts=[params.query], + n_results=params.limit, + where=where, + where_document=params.where_document, + include=params.include, ) - where = params.where if params.where is not None else params.metadata_filter - - with suppress_logging( - "chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR - ): - results: QueryResult = collection.query( - query_texts=[params.query], - n_results=params.limit, - where=where, - where_document=params.where_document, - include=params.include, - ) - - return _process_query_results( - collection=collection, - results=results, - params=params, - ) + return _process_query_results( + collection=collection, + results=results, + params=params, + ) async def asearch( self, **kwargs: Unpack[ChromaDBCollectionSearchParams] @@ -510,30 +509,29 @@ class ChromaDBClient(BaseClient): params = _extract_search_params(kwargs) - async with self._alocked(): - collection = await self.client.get_or_create_collection( - name=_sanitize_collection_name(params.collection_name), - embedding_function=self.embedding_function, + collection = await self.client.get_or_create_collection( + name=_sanitize_collection_name(params.collection_name), + embedding_function=self.embedding_function, + ) + + where = params.where if params.where is not None else params.metadata_filter + + with suppress_logging( + "chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR + ): + results: QueryResult = await collection.query( + query_texts=[params.query], + n_results=params.limit, + where=where, + where_document=params.where_document, + include=params.include, ) - where = params.where if params.where is not None else params.metadata_filter - - with suppress_logging( - "chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR - ): - results: QueryResult = await collection.query( - query_texts=[params.query], - n_results=params.limit, - where=where, - where_document=params.where_document, - include=params.include, - ) - - return _process_query_results( - collection=collection, - results=results, - params=params, - ) + return _process_query_results( + collection=collection, + results=results, + params=params, + ) def delete_collection(self, **kwargs: Unpack[BaseCollectionParams]) -> None: """Delete a collection and all its data. diff --git a/lib/crewai/src/crewai/utilities/lock_store.py b/lib/crewai/src/crewai/utilities/lock_store.py index 91b3d742a..b2ac4d81c 100644 --- a/lib/crewai/src/crewai/utilities/lock_store.py +++ b/lib/crewai/src/crewai/utilities/lock_store.py @@ -10,17 +10,21 @@ from collections.abc import Iterator from contextlib import contextmanager from functools import lru_cache from hashlib import md5 +import logging import os import tempfile from typing import TYPE_CHECKING, Final import portalocker +import portalocker.exceptions if TYPE_CHECKING: import redis +logger = logging.getLogger(__name__) + _REDIS_URL: str | None = os.environ.get("REDIS_URL") _DEFAULT_TIMEOUT: Final[int] = 120 @@ -57,5 +61,16 @@ def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: else: lock_dir = tempfile.gettempdir() lock_path = os.path.join(lock_dir, f"{channel}.lock") - with portalocker.Lock(lock_path, timeout=timeout): + try: + pl = portalocker.Lock(lock_path, timeout=timeout) + pl.acquire() + except portalocker.exceptions.BaseLockException as exc: + raise portalocker.exceptions.LockException( + f"Failed to acquire lock '{name}' at {lock_path} " + f"(timeout={timeout}s). This commonly occurs in " + f"multi-process environments. " + ) from exc + try: yield + finally: + pl.release() # type: ignore[no-untyped-call]