Compare commits

...

3 Commits

Author SHA1 Message Date
Greyson LaLonde
e1d7de0dba docs: update changelog and version for v1.10.2rc2
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Check Documentation Broken Links / Check broken links (push) Waiting to run
2026-03-14 00:49:48 -04:00
Greyson LaLonde
96b07bfc84 feat: bump versions to 1.10.2rc2 2026-03-14 00:34:12 -04:00
Greyson LaLonde
b8d7942675 fix: remove exclusive locks from read-only storage operations
* fix: remove exclusive locks from read-only storage operations to eliminate lock contention

read operations like search, list_scopes, get_scope_info, count across
LanceDB, ChromaDB, and RAG adapters were holding exclusive locks unnecessarily.
under multi-process prefork workers this caused RedisLock contention triggering
a portalocker bug where AlreadyLocked is raised with the exceptions module as its arg.

- remove store_lock from 7 LanceDB read methods since MVCC handles concurrent reads
- remove store_lock from ChromaDB search/asearch which are thread-safe since v0.4
- remove store_lock from RAG core query and LanceDB adapter query
- wrap lock_store BaseLockException with actionable error message
- add exception handling in encoding_flow/recall_flow ThreadPoolExecutor calls
- fix flow.py double-logging of ancestor listener errors

* fix: remove dead conditional in filter_and_chunk fallback

both branches of the if/else and the except all produced the same
candidates = [scope_prefix] result, making the get_scope_info call
and conditional pointless

* fix: separate lock acquisition from caller body in lock_store

the try/except wrapped the yield inside the contextmanager, which meant
any BaseLockException raised by the caller's code inside the with block
would be caught and re-raised with a misleading "Failed to acquire lock"
message. split into acquire-then-yield so only actual acquisition
failures get the actionable error message.
2026-03-14 00:21:14 -04:00
20 changed files with 211 additions and 109 deletions

View File

@@ -4,6 +4,25 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Mar 14, 2026">
## v1.10.2rc2
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.10.2rc2)
## What's Changed
### Bug Fixes
- Remove exclusive locks from read-only storage operations
### Documentation
- Update changelog and version for v1.10.2rc1
## Contributors
@greysonlalonde
</Update>
<Update label="Mar 13, 2026">
## v1.10.2rc1

View File

@@ -4,6 +4,25 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 3월 14일">
## v1.10.2rc2
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.10.2rc2)
## 변경 사항
### 버그 수정
- 읽기 전용 스토리지 작업에서 독점 잠금 제거
### 문서
- v1.10.2rc1에 대한 변경 로그 및 버전 업데이트
## 기여자
@greysonlalonde
</Update>
<Update label="2026년 3월 13일">
## v1.10.2rc1

View File

@@ -4,6 +4,25 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="14 mar 2026">
## v1.10.2rc2
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.10.2rc2)
## O que Mudou
### Correções de Bugs
- Remover bloqueios exclusivos de operações de armazenamento somente leitura
### Documentação
- Atualizar changelog e versão para v1.10.2rc1
## Contribuidores
@greysonlalonde
</Update>
<Update label="13 mar 2026">
## v1.10.2rc1

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.10.2rc1"
__version__ = "1.10.2rc2"

View File

@@ -11,7 +11,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.10.2rc1",
"crewai==1.10.2rc2",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -309,4 +309,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.10.2rc1"
__version__ = "1.10.2rc2"

View File

@@ -46,13 +46,12 @@ class LanceDBAdapter(Adapter):
def query(self, question: str) -> str: # type: ignore[override]
query = self.embedding_function([question])[0]
with store_lock(self._lock_name):
results = (
self._table.search(query, vector_column_name=self.vector_column_name)
.limit(self.top_k)
.select([self.text_column_name])
.to_list()
)
results = (
self._table.search(query, vector_column_name=self.vector_column_name)
.limit(self.top_k)
.select([self.text_column_name])
.to_list()
)
values = [result[self.text_column_name] for result in results]
return "\n".join(values)

View File

@@ -173,13 +173,12 @@ class RAG(Adapter):
try:
question_embedding = self._embedding_service.embed_text(question)
with store_lock(self._lock_name):
results = self._collection.query(
query_embeddings=[question_embedding],
n_results=self.top_k,
where=where,
include=["documents", "metadatas", "distances"],
)
results = self._collection.query(
query_embeddings=[question_embedding],
n_results=self.top_k,
where=where,
include=["documents", "metadatas", "distances"],
)
if (
not results

View File

@@ -53,7 +53,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.10.2rc1",
"crewai-tools==1.10.2rc2",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -41,7 +41,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.10.2rc1"
__version__ = "1.10.2rc2"
_telemetry_submitted = False

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.2rc1"
"crewai[tools]==1.10.2rc2"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.2rc1"
"crewai[tools]==1.10.2rc2"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.2rc1"
"crewai[tools]==1.10.2rc2"
]
[tool.crewai]

View File

@@ -2716,7 +2716,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
from crewai.flow.async_feedback.types import HumanFeedbackPending
if not isinstance(e, HumanFeedbackPending):
logger.error(f"Error executing listener {listener_name}: {e}")
if not getattr(e, "_flow_listener_logged", False):
logger.error(f"Error executing listener {listener_name}: {e}")
e._flow_listener_logged = True # type: ignore[attr-defined]
raise
# ── User Input (self.ask) ────────────────────────────────────────

View File

@@ -13,6 +13,7 @@ from __future__ import annotations
from concurrent.futures import Future, ThreadPoolExecutor
import contextvars
from datetime import datetime
import logging
import math
from typing import Any
from uuid import uuid4
@@ -29,6 +30,8 @@ from crewai.memory.analyze import (
from crewai.memory.types import MemoryConfig, MemoryRecord, embed_texts
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# State models
# ---------------------------------------------------------------------------
@@ -188,7 +191,15 @@ class EncodingFlow(Flow[EncodingState]):
if len(active) == 1:
_, item = active[0]
raw = _search_one(item)
try:
raw = _search_one(item)
except Exception:
logger.warning(
"Storage search failed in parallel_find_similar, "
"treating item as new",
exc_info=True,
)
raw = []
item.similar_records = [r for r, _ in raw]
item.top_similarity = float(raw[0][1]) if raw else 0.0
else:
@@ -202,7 +213,15 @@ class EncodingFlow(Flow[EncodingState]):
for i, item in active
]
for _, item, future in futures:
raw = future.result()
try:
raw = future.result()
except Exception:
logger.warning(
"Storage search failed in parallel_find_similar, "
"treating item as new",
exc_info=True,
)
raw = []
item.similar_records = [r for r, _ in raw]
item.top_similarity = float(raw[0][1]) if raw else 0.0

View File

@@ -13,6 +13,7 @@ from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
import contextvars
from datetime import datetime
import logging
from typing import Any
from uuid import uuid4
@@ -30,6 +31,9 @@ from crewai.memory.types import (
)
logger = logging.getLogger(__name__)
class RecallState(BaseModel):
"""State for the recall flow."""
@@ -125,7 +129,14 @@ class RecallFlow(Flow[RecallState]):
if len(tasks) <= 1:
for emb, sc in tasks:
scope, results = _search_one(emb, sc)
try:
scope, results = _search_one(emb, sc)
except Exception:
logger.warning(
"Storage search failed in recall flow, skipping scope",
exc_info=True,
)
continue
if results:
top_composite, _ = compute_composite_score(
results[0][0], results[0][1], self._config
@@ -147,7 +158,14 @@ class RecallFlow(Flow[RecallState]):
for emb, sc in tasks
}
for future in as_completed(futures):
scope, results = future.result()
try:
scope, results = future.result()
except Exception:
logger.warning(
"Storage search failed in recall flow, skipping scope",
exc_info=True,
)
continue
if results:
top_composite, _ = compute_composite_score(
results[0][0], results[0][1], self._config
@@ -246,13 +264,17 @@ class RecallFlow(Flow[RecallState]):
if analysis and analysis.suggested_scopes:
candidates = [s for s in analysis.suggested_scopes if s]
else:
candidates = self._storage.list_scopes(scope_prefix)
try:
candidates = self._storage.list_scopes(scope_prefix)
except Exception:
logger.warning(
"Storage list_scopes failed in filter_and_chunk, "
"falling back to scope prefix",
exc_info=True,
)
candidates = []
if not candidates:
info = self._storage.get_scope_info(scope_prefix)
if info.record_count > 0:
candidates = [scope_prefix]
else:
candidates = [scope_prefix]
candidates = [scope_prefix]
self.state.candidate_scopes = candidates[:20]
return self.state.candidate_scopes

View File

@@ -350,12 +350,11 @@ class LanceDBStorage:
"""Return a single record by ID, or None if not found."""
if self._table is None:
return None
with store_lock(self._lock_name):
safe_id = str(record_id).replace("'", "''")
rows = self._table.search().where(f"id = '{safe_id}'").limit(1).to_list()
if not rows:
return None
return self._row_to_record(rows[0])
safe_id = str(record_id).replace("'", "''")
rows = self._table.search().where(f"id = '{safe_id}'").limit(1).to_list()
if not rows:
return None
return self._row_to_record(rows[0])
def search(
self,
@@ -368,15 +367,14 @@ class LanceDBStorage:
) -> list[tuple[MemoryRecord, float]]:
if self._table is None:
return []
with store_lock(self._lock_name):
query = self._table.search(query_embedding)
if scope_prefix is not None and scope_prefix.strip("/"):
prefix = scope_prefix.rstrip("/")
like_val = prefix + "%"
query = query.where(f"scope LIKE '{like_val}'")
results = query.limit(
limit * 3 if (categories or metadata_filter) else limit
).to_list()
query = self._table.search(query_embedding)
if scope_prefix is not None and scope_prefix.strip("/"):
prefix = scope_prefix.rstrip("/")
like_val = prefix + "%"
query = query.where(f"scope LIKE '{like_val}'")
results = query.limit(
limit * 3 if (categories or metadata_filter) else limit
).to_list()
out: list[tuple[MemoryRecord, float]] = []
for row in results:
record = self._row_to_record(row)
@@ -460,8 +458,6 @@ class LanceDBStorage:
Uses a full table scan (no vector query) so the limit is applied after
the scope filter, not to ANN candidates before filtering.
Caller must hold ``store_lock(self._lock_name)``.
Args:
scope_prefix: Optional scope path prefix to filter by.
limit: Maximum number of rows to return (applied after filtering).
@@ -492,8 +488,7 @@ class LanceDBStorage:
Returns:
List of MemoryRecord, ordered by created_at descending.
"""
with store_lock(self._lock_name):
rows = self._scan_rows(scope_prefix, limit=limit + offset)
rows = self._scan_rows(scope_prefix, limit=limit + offset)
records = [self._row_to_record(r) for r in rows]
records.sort(key=lambda r: r.created_at, reverse=True)
return records[offset : offset + limit]
@@ -503,11 +498,10 @@ class LanceDBStorage:
prefix = scope if scope != "/" else ""
if prefix and not prefix.startswith("/"):
prefix = "/" + prefix
with store_lock(self._lock_name):
rows = self._scan_rows(
prefix or None,
columns=["scope", "categories_str", "created_at"],
)
rows = self._scan_rows(
prefix or None,
columns=["scope", "categories_str", "created_at"],
)
if not rows:
return ScopeInfo(
path=scope or "/",
@@ -558,8 +552,7 @@ class LanceDBStorage:
def list_scopes(self, parent: str = "/") -> list[str]:
parent = parent.rstrip("/") or ""
prefix = (parent + "/") if parent else "/"
with store_lock(self._lock_name):
rows = self._scan_rows(prefix if prefix != "/" else None, columns=["scope"])
rows = self._scan_rows(prefix if prefix != "/" else None, columns=["scope"])
children: set[str] = set()
for row in rows:
sc = str(row.get("scope", ""))
@@ -571,8 +564,7 @@ class LanceDBStorage:
return sorted(children)
def list_categories(self, scope_prefix: str | None = None) -> dict[str, int]:
with store_lock(self._lock_name):
rows = self._scan_rows(scope_prefix, columns=["categories_str"])
rows = self._scan_rows(scope_prefix, columns=["categories_str"])
counts: dict[str, int] = {}
for row in rows:
cat_str = row.get("categories_str") or "[]"
@@ -588,8 +580,7 @@ class LanceDBStorage:
if self._table is None:
return 0
if scope_prefix is None or scope_prefix.strip("/") == "":
with store_lock(self._lock_name):
return int(self._table.count_rows())
return int(self._table.count_rows())
info = self.get_scope_info(scope_prefix)
return info.record_count

View File

@@ -446,30 +446,29 @@ class ChromaDBClient(BaseClient):
params = _extract_search_params(kwargs)
with self._locked():
collection = self.client.get_or_create_collection(
name=_sanitize_collection_name(params.collection_name),
embedding_function=self.embedding_function,
collection = self.client.get_or_create_collection(
name=_sanitize_collection_name(params.collection_name),
embedding_function=self.embedding_function,
)
where = params.where if params.where is not None else params.metadata_filter
with suppress_logging(
"chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR
):
results: QueryResult = collection.query(
query_texts=[params.query],
n_results=params.limit,
where=where,
where_document=params.where_document,
include=params.include,
)
where = params.where if params.where is not None else params.metadata_filter
with suppress_logging(
"chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR
):
results: QueryResult = collection.query(
query_texts=[params.query],
n_results=params.limit,
where=where,
where_document=params.where_document,
include=params.include,
)
return _process_query_results(
collection=collection,
results=results,
params=params,
)
return _process_query_results(
collection=collection,
results=results,
params=params,
)
async def asearch(
self, **kwargs: Unpack[ChromaDBCollectionSearchParams]
@@ -510,30 +509,29 @@ class ChromaDBClient(BaseClient):
params = _extract_search_params(kwargs)
async with self._alocked():
collection = await self.client.get_or_create_collection(
name=_sanitize_collection_name(params.collection_name),
embedding_function=self.embedding_function,
collection = await self.client.get_or_create_collection(
name=_sanitize_collection_name(params.collection_name),
embedding_function=self.embedding_function,
)
where = params.where if params.where is not None else params.metadata_filter
with suppress_logging(
"chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR
):
results: QueryResult = await collection.query(
query_texts=[params.query],
n_results=params.limit,
where=where,
where_document=params.where_document,
include=params.include,
)
where = params.where if params.where is not None else params.metadata_filter
with suppress_logging(
"chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR
):
results: QueryResult = await collection.query(
query_texts=[params.query],
n_results=params.limit,
where=where,
where_document=params.where_document,
include=params.include,
)
return _process_query_results(
collection=collection,
results=results,
params=params,
)
return _process_query_results(
collection=collection,
results=results,
params=params,
)
def delete_collection(self, **kwargs: Unpack[BaseCollectionParams]) -> None:
"""Delete a collection and all its data.

View File

@@ -10,17 +10,21 @@ from collections.abc import Iterator
from contextlib import contextmanager
from functools import lru_cache
from hashlib import md5
import logging
import os
import tempfile
from typing import TYPE_CHECKING, Final
import portalocker
import portalocker.exceptions
if TYPE_CHECKING:
import redis
logger = logging.getLogger(__name__)
_REDIS_URL: str | None = os.environ.get("REDIS_URL")
_DEFAULT_TIMEOUT: Final[int] = 120
@@ -57,5 +61,16 @@ def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]:
else:
lock_dir = tempfile.gettempdir()
lock_path = os.path.join(lock_dir, f"{channel}.lock")
with portalocker.Lock(lock_path, timeout=timeout):
try:
pl = portalocker.Lock(lock_path, timeout=timeout)
pl.acquire()
except portalocker.exceptions.BaseLockException as exc:
raise portalocker.exceptions.LockException(
f"Failed to acquire lock '{name}' at {lock_path} "
f"(timeout={timeout}s). This commonly occurs in "
f"multi-process environments. "
) from exc
try:
yield
finally:
pl.release() # type: ignore[no-untyped-call]

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.10.2rc1"
__version__ = "1.10.2rc2"