fix: resolve LockException under concurrent multi-process execution

This commit is contained in:
Greyson LaLonde
2026-03-10 22:43:20 -04:00
parent 0046f9a96f
commit 1bc92ebb5f
4 changed files with 85 additions and 60 deletions

View File

@@ -73,6 +73,7 @@ class SQLiteFlowPersistence(FlowPersistence):
def init_db(self) -> None:
"""Create the necessary tables if they don't exist."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL")
# Main state table
conn.execute(
"""

View File

@@ -39,6 +39,7 @@ class KickoffTaskOutputsSQLiteStorage:
"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL")
cursor = conn.cursor()
cursor.execute(
"""

View File

@@ -12,6 +12,7 @@ import time
from typing import Any, ClassVar
import lancedb
import portalocker
from crewai.memory.types import MemoryRecord, ScopeInfo
@@ -90,6 +91,7 @@ class LanceDBStorage:
# Raise it proactively so scans on large tables never hit OS error 24.
try:
import resource
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
if soft < 4096:
resource.setrlimit(resource.RLIMIT_NOFILE, (min(hard, 4096), hard))
@@ -99,7 +101,8 @@ class LanceDBStorage:
self._compact_every = compact_every
self._save_count = 0
# Get or create a shared write lock for this database path.
self._lockfile = str(self._path / ".lance_write.lock")
resolved = str(self._path.resolve())
with LanceDBStorage._path_locks_guard:
if resolved not in LanceDBStorage._path_locks:
@@ -110,10 +113,13 @@ class LanceDBStorage:
# If no table exists yet, defer creation until the first save so the
# dimension can be auto-detected from the embedder's actual output.
try:
self._table: lancedb.table.Table | None = self._db.open_table(self._table_name)
self._table: lancedb.table.Table | None = self._db.open_table(
self._table_name
)
self._vector_dim: int = self._infer_dim_from_table(self._table)
# Best-effort: create the scope index if it doesn't exist yet.
self._ensure_scope_index()
with self._file_lock():
self._ensure_scope_index()
# Compact in the background if the table has accumulated many
# fragments from previous runs (each save() creates one).
self._compact_if_needed()
@@ -124,7 +130,8 @@ class LanceDBStorage:
# Explicit dim provided: create the table immediately if it doesn't exist.
if self._table is None and vector_dim is not None:
self._vector_dim = vector_dim
self._table = self._create_table(vector_dim)
with self._file_lock():
self._table = self._create_table(vector_dim)
@property
def write_lock(self) -> threading.RLock:
@@ -149,18 +156,14 @@ class LanceDBStorage:
break
return DEFAULT_VECTOR_DIM
def _retry_write(self, op: str, *args: Any, **kwargs: Any) -> Any:
"""Execute a table operation with retry on LanceDB commit conflicts.
def _file_lock(self) -> portalocker.Lock:
"""Return a cross-process file lock for serialising writes."""
return portalocker.Lock(self._lockfile, timeout=120)
Args:
op: Method name on the table object (e.g. "add", "delete").
*args, **kwargs: Passed to the table method.
def _do_write(self, op: str, *args: Any, **kwargs: Any) -> Any:
"""Execute a single table write with retry on commit conflicts.
LanceDB uses optimistic concurrency: if two transactions overlap,
the second to commit fails with an ``OSError`` containing
"Commit conflict". This helper retries with exponential backoff,
refreshing the table reference before each retry so the retried
call uses the latest committed version (not a stale reference).
Caller must already hold the cross-process file lock.
"""
delay = _RETRY_BASE_DELAY
for attempt in range(_MAX_RETRIES + 1):
@@ -171,20 +174,24 @@ class LanceDBStorage:
raise
_logger.debug(
"LanceDB commit conflict on %s (attempt %d/%d), retrying in %.1fs",
op, attempt + 1, _MAX_RETRIES, delay,
op,
attempt + 1,
_MAX_RETRIES,
delay,
)
# Refresh table to pick up the latest version before retrying.
# The next getattr(self._table, op) will use the fresh table.
try:
self._table = self._db.open_table(self._table_name)
except Exception: # noqa: S110
pass # table refresh is best-effort
pass
time.sleep(delay)
delay *= 2
return None # unreachable, but satisfies type checker
def _create_table(self, vector_dim: int) -> lancedb.table.Table:
"""Create a new table with the given vector dimension."""
"""Create a new table with the given vector dimension.
Caller must already hold the cross-process file lock.
"""
placeholder = [
{
"id": "__schema_placeholder__",
@@ -248,9 +255,9 @@ class LanceDBStorage:
"""Run ``table.optimize()`` in a background thread, absorbing errors."""
try:
if self._table is not None:
self._table.optimize()
# Refresh the scope index so new fragments are covered.
self._ensure_scope_index()
with self._file_lock():
self._table.optimize()
self._ensure_scope_index()
except Exception:
_logger.debug("LanceDB background compaction failed", exc_info=True)
@@ -280,7 +287,9 @@ class LanceDBStorage:
"last_accessed": record.last_accessed.isoformat(),
"source": record.source or "",
"private": record.private,
"vector": record.embedding if record.embedding else [0.0] * self._vector_dim,
"vector": record.embedding
if record.embedding
else [0.0] * self._vector_dim,
}
def _row_to_record(self, row: dict[str, Any]) -> MemoryRecord:
@@ -296,7 +305,9 @@ class LanceDBStorage:
id=str(row["id"]),
content=str(row["content"]),
scope=str(row["scope"]),
categories=json.loads(row["categories_str"]) if row.get("categories_str") else [],
categories=json.loads(row["categories_str"])
if row.get("categories_str")
else [],
metadata=json.loads(row["metadata_str"]) if row.get("metadata_str") else {},
importance=float(row.get("importance", 0.5)),
created_at=_parse_dt(row.get("created_at")),
@@ -316,16 +327,15 @@ class LanceDBStorage:
dim = len(r.embedding)
break
is_new_table = self._table is None
with self._write_lock:
with self._write_lock, self._file_lock():
self._ensure_table(vector_dim=dim)
rows = [self._record_to_row(r) for r in records]
for r in rows:
if r["vector"] is None or len(r["vector"]) != self._vector_dim:
r["vector"] = [0.0] * self._vector_dim
self._retry_write("add", rows)
# Create the scope index on the first save so it covers the initial dataset.
if is_new_table:
self._ensure_scope_index()
self._do_write("add", rows)
if is_new_table:
self._ensure_scope_index()
# Auto-compact every N saves so fragment files don't pile up.
self._save_count += 1
if self._compact_every > 0 and self._save_count % self._compact_every == 0:
@@ -333,14 +343,14 @@ class LanceDBStorage:
def update(self, record: MemoryRecord) -> None:
"""Update a record by ID. Preserves created_at, updates last_accessed."""
with self._write_lock:
with self._write_lock, self._file_lock():
self._ensure_table()
safe_id = str(record.id).replace("'", "''")
self._retry_write("delete", f"id = '{safe_id}'")
self._do_write("delete", f"id = '{safe_id}'")
row = self._record_to_row(record)
if row["vector"] is None or len(row["vector"]) != self._vector_dim:
row["vector"] = [0.0] * self._vector_dim
self._retry_write("add", [row])
self._do_write("add", [row])
def touch_records(self, record_ids: list[str]) -> None:
"""Update last_accessed to now for the given record IDs.
@@ -354,11 +364,11 @@ class LanceDBStorage:
"""
if not record_ids or self._table is None:
return
with self._write_lock:
with self._write_lock, self._file_lock():
now = datetime.utcnow().isoformat()
safe_ids = [str(rid).replace("'", "''") for rid in record_ids]
ids_expr = ", ".join(f"'{rid}'" for rid in safe_ids)
self._retry_write(
self._do_write(
"update",
where=f"id IN ({ids_expr})",
values={"last_accessed": now},
@@ -390,13 +400,17 @@ class LanceDBStorage:
prefix = scope_prefix.rstrip("/")
like_val = prefix + "%"
query = query.where(f"scope LIKE '{like_val}'")
results = query.limit(limit * 3 if (categories or metadata_filter) else limit).to_list()
results = query.limit(
limit * 3 if (categories or metadata_filter) else limit
).to_list()
out: list[tuple[MemoryRecord, float]] = []
for row in results:
record = self._row_to_record(row)
if categories and not any(c in record.categories for c in categories):
continue
if metadata_filter and not all(record.metadata.get(k) == v for k, v in metadata_filter.items()):
if metadata_filter and not all(
record.metadata.get(k) == v for k, v in metadata_filter.items()
):
continue
distance = row.get("_distance", 0.0)
score = 1.0 / (1.0 + float(distance)) if distance is not None else 1.0
@@ -416,20 +430,24 @@ class LanceDBStorage:
) -> int:
if self._table is None:
return 0
with self._write_lock:
with self._write_lock, self._file_lock():
if record_ids and not (categories or metadata_filter):
before = self._table.count_rows()
ids_expr = ", ".join(f"'{rid}'" for rid in record_ids)
self._retry_write("delete", f"id IN ({ids_expr})")
self._do_write("delete", f"id IN ({ids_expr})")
return before - self._table.count_rows()
if categories or metadata_filter:
rows = self._scan_rows(scope_prefix)
to_delete: list[str] = []
for row in rows:
record = self._row_to_record(row)
if categories and not any(c in record.categories for c in categories):
if categories and not any(
c in record.categories for c in categories
):
continue
if metadata_filter and not all(record.metadata.get(k) == v for k, v in metadata_filter.items()):
if metadata_filter and not all(
record.metadata.get(k) == v for k, v in metadata_filter.items()
):
continue
if older_than and record.created_at >= older_than:
continue
@@ -438,7 +456,7 @@ class LanceDBStorage:
return 0
before = self._table.count_rows()
ids_expr = ", ".join(f"'{rid}'" for rid in to_delete)
self._retry_write("delete", f"id IN ({ids_expr})")
self._do_write("delete", f"id IN ({ids_expr})")
return before - self._table.count_rows()
conditions = []
if scope_prefix is not None and scope_prefix.strip("/"):
@@ -450,11 +468,11 @@ class LanceDBStorage:
conditions.append(f"created_at < '{older_than.isoformat()}'")
if not conditions:
before = self._table.count_rows()
self._retry_write("delete", "id != ''")
self._do_write("delete", "id != ''")
return before - self._table.count_rows()
where_expr = " AND ".join(conditions)
before = self._table.count_rows()
self._retry_write("delete", where_expr)
self._do_write("delete", where_expr)
return before - self._table.count_rows()
def _scan_rows(
@@ -528,7 +546,7 @@ class LanceDBStorage:
for row in rows:
sc = str(row.get("scope", ""))
if child_prefix and sc.startswith(child_prefix):
rest = sc[len(child_prefix):]
rest = sc[len(child_prefix) :]
first_component = rest.split("/", 1)[0]
if first_component:
children.add(child_prefix + first_component)
@@ -539,7 +557,11 @@ class LanceDBStorage:
pass
created = row.get("created_at")
if created:
dt = datetime.fromisoformat(str(created).replace("Z", "+00:00")) if isinstance(created, str) else created
dt = (
datetime.fromisoformat(str(created).replace("Z", "+00:00"))
if isinstance(created, str)
else created
)
if isinstance(dt, datetime):
if oldest is None or dt < oldest:
oldest = dt
@@ -562,7 +584,7 @@ class LanceDBStorage:
for row in rows:
sc = str(row.get("scope", ""))
if sc.startswith(prefix) and sc != (prefix.rstrip("/") or "/"):
rest = sc[len(prefix):]
rest = sc[len(prefix) :]
first_component = rest.split("/", 1)[0]
if first_component:
children.add(prefix + first_component)
@@ -590,17 +612,17 @@ class LanceDBStorage:
return info.record_count
def reset(self, scope_prefix: str | None = None) -> None:
if scope_prefix is None or scope_prefix.strip("/") == "":
if self._table is not None:
self._db.drop_table(self._table_name)
self._table = None
# Dimension is preserved; table will be recreated on next save.
return
if self._table is None:
return
prefix = scope_prefix.rstrip("/")
if prefix:
self._table.delete(f"scope >= '{prefix}' AND scope < '{prefix}/\uFFFF'")
with self._file_lock():
if scope_prefix is None or scope_prefix.strip("/") == "":
if self._table is not None:
self._db.drop_table(self._table_name)
self._table = None
return
if self._table is None:
return
prefix = scope_prefix.rstrip("/")
if prefix:
self._table.delete(f"scope >= '{prefix}' AND scope < '{prefix}/\uffff'")
def optimize(self) -> None:
"""Compact the table synchronously and refresh the scope index.
@@ -614,8 +636,9 @@ class LanceDBStorage:
"""
if self._table is None:
return
self._table.optimize()
self._ensure_scope_index()
with self._file_lock():
self._table.optimize()
self._ensure_scope_index()
async def asave(self, records: list[MemoryRecord]) -> None:
self.save(records)

View File

@@ -28,7 +28,7 @@ def create_client(config: ChromaDBConfig) -> ChromaDBClient:
lock_id = md5(persist_dir.encode(), usedforsecurity=False).hexdigest()
lockfile = os.path.join(persist_dir, f"chromadb-{lock_id}.lock")
with portalocker.Lock(lockfile):
with portalocker.Lock(lockfile, timeout=120):
client = PersistentClient(
path=persist_dir,
settings=config.settings,