mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-01 07:13:00 +00:00
* fix: remove exclusive locks from read-only storage operations to eliminate lock contention read operations like search, list_scopes, get_scope_info, count across LanceDB, ChromaDB, and RAG adapters were holding exclusive locks unnecessarily. under multi-process prefork workers this caused RedisLock contention triggering a portalocker bug where AlreadyLocked is raised with the exceptions module as its arg. - remove store_lock from 7 LanceDB read methods since MVCC handles concurrent reads - remove store_lock from ChromaDB search/asearch which are thread-safe since v0.4 - remove store_lock from RAG core query and LanceDB adapter query - wrap lock_store BaseLockException with actionable error message - add exception handling in encoding_flow/recall_flow ThreadPoolExecutor calls - fix flow.py double-logging of ancestor listener errors * fix: remove dead conditional in filter_and_chunk fallback both branches of the if/else and the except all produced the same candidates = [scope_prefix] result, making the get_scope_info call and conditional pointless * fix: separate lock acquisition from caller body in lock_store the try/except wrapped the yield inside the contextmanager, which meant any BaseLockException raised by the caller's code inside the with block would be caught and re-raised with a misleading "Failed to acquire lock" message. split into acquire-then-yield so only actual acquisition failures get the actionable error message.
65 lines
2.0 KiB
Python
65 lines
2.0 KiB
Python
from collections.abc import Callable
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from crewai.utilities.lock_store import lock as store_lock
|
|
from lancedb import ( # type: ignore[import-untyped]
|
|
DBConnection as LanceDBConnection,
|
|
connect as lancedb_connect,
|
|
)
|
|
from lancedb.table import Table as LanceDBTable # type: ignore[import-untyped]
|
|
from openai import Client as OpenAIClient
|
|
from pydantic import Field, PrivateAttr
|
|
|
|
from crewai_tools.tools.rag.rag_tool import Adapter
|
|
|
|
|
|
def _default_embedding_function():
|
|
client = OpenAIClient()
|
|
|
|
def _embedding_function(input):
|
|
rs = client.embeddings.create(input=input, model="text-embedding-ada-002")
|
|
return [record.embedding for record in rs.data]
|
|
|
|
return _embedding_function
|
|
|
|
|
|
class LanceDBAdapter(Adapter):
|
|
uri: str | Path
|
|
table_name: str
|
|
embedding_function: Callable = Field(default_factory=_default_embedding_function)
|
|
top_k: int = 3
|
|
vector_column_name: str = "vector"
|
|
text_column_name: str = "text"
|
|
|
|
_db: LanceDBConnection = PrivateAttr()
|
|
_table: LanceDBTable = PrivateAttr()
|
|
_lock_name: str = PrivateAttr(default="")
|
|
|
|
def model_post_init(self, __context: Any) -> None:
|
|
self._db = lancedb_connect(self.uri)
|
|
self._table = self._db.open_table(self.table_name)
|
|
self._lock_name = f"lancedb:{os.path.realpath(str(self.uri))}"
|
|
|
|
super().model_post_init(__context)
|
|
|
|
def query(self, question: str) -> str: # type: ignore[override]
|
|
query = self.embedding_function([question])[0]
|
|
results = (
|
|
self._table.search(query, vector_column_name=self.vector_column_name)
|
|
.limit(self.top_k)
|
|
.select([self.text_column_name])
|
|
.to_list()
|
|
)
|
|
values = [result[self.text_column_name] for result in results]
|
|
return "\n".join(values)
|
|
|
|
def add(
|
|
self,
|
|
*args: Any,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
with store_lock(self._lock_name):
|
|
self._table.add(*args, **kwargs)
|