Compare commits

..

1 Commits

Author SHA1 Message Date
Cursor Agent
537186d4d5 fix: group bedrock tool results
Co-authored-by: João Moura <joaomdmoura@gmail.com>
2026-03-10 18:11:50 +00:00
20 changed files with 151 additions and 433 deletions

View File

@@ -4,39 +4,6 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Mar 11, 2026">
## v1.10.2a1
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.10.2a1)
## What's Changed
### Features
- Add support for tool search, saving tokens, and dynamically injecting appropriate tools during execution for Anthropics.
- Introduce more Brave Search tools.
- Create action for nightly releases.
### Bug Fixes
- Fix LockException under concurrent multi-process execution.
- Resolve issues with grouping parallel tool results in a single user message.
- Address MCP tools resolutions and eliminate all shared mutable connections.
- Update LLM parameter handling in the human_feedback function.
- Add missing list/dict methods to LockedListProxy and LockedDictProxy.
- Propagate contextvars context to parallel tool call threads.
- Bump gitpython dependency to >=3.1.41 to resolve CVE path traversal vulnerability.
### Refactoring
- Refactor memory classes to be serializable.
### Documentation
- Update changelog and version for v1.10.1.
## Contributors
@akaKuruma, @github-actions[bot], @giulio-leone, @greysonlalonde, @joaomdmoura, @jonathansampson, @lorenzejay, @lucasgomide, @mattatcha
</Update>
<Update label="Mar 04, 2026">
## v1.10.1

View File

@@ -4,39 +4,6 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 3월 11일">
## v1.10.2a1
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.10.2a1)
## 변경 사항
### 기능
- Anthropics에 대한 도구 검색 지원 추가, 토큰 저장, 실행 중 적절한 도구를 동적으로 주입하는 기능 추가.
- 더 많은 Brave Search 도구 도입.
- 야간 릴리스를 위한 액션 생성.
### 버그 수정
- 동시 다중 프로세스 실행 중 LockException 수정.
- 단일 사용자 메시지에서 병렬 도구 결과 그룹화 문제 해결.
- MCP 도구 해상도 문제 해결 및 모든 공유 가변 연결 제거.
- human_feedback 함수에서 LLM 매개변수 처리 업데이트.
- LockedListProxy 및 LockedDictProxy에 누락된 list/dict 메서드 추가.
- 병렬 도구 호출 스레드에 contextvars 컨텍스트 전파.
- CVE 경로 탐색 취약점을 해결하기 위해 gitpython 의존성을 >=3.1.41로 업데이트.
### 리팩토링
- 메모리 클래스를 직렬화 가능하도록 리팩토링.
### 문서
- v1.10.1에 대한 변경 로그 및 버전 업데이트.
## 기여자
@akaKuruma, @github-actions[bot], @giulio-leone, @greysonlalonde, @joaomdmoura, @jonathansampson, @lorenzejay, @lucasgomide, @mattatcha
</Update>
<Update label="2026년 3월 4일">
## v1.10.1

View File

@@ -4,39 +4,6 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="11 mar 2026">
## v1.10.2a1
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.10.2a1)
## O que mudou
### Recursos
- Adicionar suporte para busca de ferramentas, salvamento de tokens e injeção dinâmica de ferramentas apropriadas durante a execução para Anthropics.
- Introduzir mais ferramentas de Busca Brave.
- Criar ação para lançamentos noturnos.
### Correções de Bugs
- Corrigir LockException durante a execução concorrente de múltiplos processos.
- Resolver problemas com a agrupação de resultados de ferramentas paralelas em uma única mensagem de usuário.
- Abordar resoluções de ferramentas MCP e eliminar todas as conexões mutáveis compartilhadas.
- Atualizar o manuseio de parâmetros LLM na função human_feedback.
- Adicionar métodos de lista/dicionário ausentes a LockedListProxy e LockedDictProxy.
- Propagar o contexto de contextvars para as threads de chamada de ferramentas paralelas.
- Atualizar a dependência gitpython para >=3.1.41 para resolver a vulnerabilidade de travessia de diretórios CVE.
### Refatoração
- Refatorar classes de memória para serem serializáveis.
### Documentação
- Atualizar o changelog e a versão para v1.10.1.
## Contribuidores
@akaKuruma, @github-actions[bot], @giulio-leone, @greysonlalonde, @joaomdmoura, @jonathansampson, @lorenzejay, @lucasgomide, @mattatcha
</Update>
<Update label="04 mar 2026">
## v1.10.1

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.10.2a1"
__version__ = "1.10.1"

View File

@@ -11,7 +11,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.10.2a1",
"crewai==1.10.1",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -309,4 +309,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.10.2a1"
__version__ = "1.10.1"

View File

@@ -5664,6 +5664,10 @@
"title": "Bucket Name",
"type": "string"
},
"cluster": {
"description": "An instance of the Couchbase Cluster connected to the desired Couchbase server.",
"title": "Cluster"
},
"collection_name": {
"description": "The name of the Couchbase collection to search",
"title": "Collection Name",
@@ -5712,6 +5716,7 @@
}
},
"required": [
"cluster",
"collection_name",
"scope_name",
"bucket_name",
@@ -14455,9 +14460,13 @@
"properties": {
"config": {
"$ref": "#/$defs/OxylabsAmazonProductScraperConfig"
},
"oxylabs_api": {
"title": "Oxylabs Api"
}
},
"required": [
"oxylabs_api",
"config"
],
"title": "OxylabsAmazonProductScraperTool",
@@ -14680,9 +14689,13 @@
"properties": {
"config": {
"$ref": "#/$defs/OxylabsAmazonSearchScraperConfig"
},
"oxylabs_api": {
"title": "Oxylabs Api"
}
},
"required": [
"oxylabs_api",
"config"
],
"title": "OxylabsAmazonSearchScraperTool",
@@ -14918,9 +14931,13 @@
"properties": {
"config": {
"$ref": "#/$defs/OxylabsGoogleSearchScraperConfig"
},
"oxylabs_api": {
"title": "Oxylabs Api"
}
},
"required": [
"oxylabs_api",
"config"
],
"title": "OxylabsGoogleSearchScraperTool",
@@ -15104,9 +15121,13 @@
"properties": {
"config": {
"$ref": "#/$defs/OxylabsUniversalScraperConfig"
},
"oxylabs_api": {
"title": "Oxylabs Api"
}
},
"required": [
"oxylabs_api",
"config"
],
"title": "OxylabsUniversalScraperTool",
@@ -23208,6 +23229,26 @@
"description": "The Tavily API key. If not provided, it will be loaded from the environment variable TAVILY_API_KEY.",
"title": "Api Key"
},
"async_client": {
"anyOf": [
{},
{
"type": "null"
}
],
"default": null,
"title": "Async Client"
},
"client": {
"anyOf": [
{},
{
"type": "null"
}
],
"default": null,
"title": "Client"
},
"extract_depth": {
"default": "basic",
"description": "The depth of extraction. 'basic' for basic extraction, 'advanced' for advanced extraction.",
@@ -23343,6 +23384,26 @@
"description": "The Tavily API key. If not provided, it will be loaded from the environment variable TAVILY_API_KEY.",
"title": "Api Key"
},
"async_client": {
"anyOf": [
{},
{
"type": "null"
}
],
"default": null,
"title": "Async Client"
},
"client": {
"anyOf": [
{},
{
"type": "null"
}
],
"default": null,
"title": "Client"
},
"days": {
"default": 7,
"description": "The number of days to search back.",

View File

@@ -53,7 +53,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.10.2a1",
"crewai-tools==1.10.1",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.10.2a1"
__version__ = "1.10.1"
_telemetry_submitted = False

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.2a1"
"crewai[tools]==1.10.1"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.2a1"
"crewai[tools]==1.10.1"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.2a1"
"crewai[tools]==1.10.1"
]
[tool.crewai]

View File

@@ -72,8 +72,7 @@ class SQLiteFlowPersistence(FlowPersistence):
def init_db(self) -> None:
"""Create the necessary tables if they don't exist."""
with sqlite3.connect(self.db_path, timeout=30) as conn:
conn.execute("PRAGMA journal_mode=WAL")
with sqlite3.connect(self.db_path) as conn:
# Main state table
conn.execute(
"""
@@ -137,7 +136,7 @@ class SQLiteFlowPersistence(FlowPersistence):
f"state_data must be either a Pydantic BaseModel or dict, got {type(state_data)}"
)
with sqlite3.connect(self.db_path, timeout=30) as conn:
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO flow_states (
@@ -164,7 +163,7 @@ class SQLiteFlowPersistence(FlowPersistence):
Returns:
The most recent state as a dictionary, or None if no state exists
"""
with sqlite3.connect(self.db_path, timeout=30) as conn:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""
SELECT state_json
@@ -214,7 +213,7 @@ class SQLiteFlowPersistence(FlowPersistence):
self.save_state(flow_uuid, context.method_name, state_data)
# Save pending feedback context
with sqlite3.connect(self.db_path, timeout=30) as conn:
with sqlite3.connect(self.db_path) as conn:
# Use INSERT OR REPLACE to handle re-triggering feedback on same flow
conn.execute(
"""
@@ -249,7 +248,7 @@ class SQLiteFlowPersistence(FlowPersistence):
# Import here to avoid circular imports
from crewai.flow.async_feedback.types import PendingFeedbackContext
with sqlite3.connect(self.db_path, timeout=30) as conn:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""
SELECT state_json, context_json
@@ -273,7 +272,7 @@ class SQLiteFlowPersistence(FlowPersistence):
Args:
flow_uuid: Unique identifier for the flow instance
"""
with sqlite3.connect(self.db_path, timeout=30) as conn:
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
DELETE FROM pending_feedback

View File

@@ -38,8 +38,7 @@ class KickoffTaskOutputsSQLiteStorage:
DatabaseOperationError: If database initialization fails due to SQLite errors.
"""
try:
with sqlite3.connect(self.db_path, timeout=30) as conn:
conn.execute("PRAGMA journal_mode=WAL")
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
@@ -83,7 +82,7 @@ class KickoffTaskOutputsSQLiteStorage:
"""
inputs = inputs or {}
try:
with sqlite3.connect(self.db_path, timeout=30) as conn:
with sqlite3.connect(self.db_path) as conn:
conn.execute("BEGIN TRANSACTION")
cursor = conn.cursor()
cursor.execute(
@@ -126,7 +125,7 @@ class KickoffTaskOutputsSQLiteStorage:
DatabaseOperationError: If updating the task output fails due to SQLite errors.
"""
try:
with sqlite3.connect(self.db_path, timeout=30) as conn:
with sqlite3.connect(self.db_path) as conn:
conn.execute("BEGIN TRANSACTION")
cursor = conn.cursor()
@@ -167,7 +166,7 @@ class KickoffTaskOutputsSQLiteStorage:
DatabaseOperationError: If loading task outputs fails due to SQLite errors.
"""
try:
with sqlite3.connect(self.db_path, timeout=30) as conn:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT *
@@ -206,7 +205,7 @@ class KickoffTaskOutputsSQLiteStorage:
DatabaseOperationError: If deleting task outputs fails due to SQLite errors.
"""
try:
with sqlite3.connect(self.db_path, timeout=30) as conn:
with sqlite3.connect(self.db_path) as conn:
conn.execute("BEGIN TRANSACTION")
cursor = conn.cursor()
cursor.execute("DELETE FROM latest_kickoff_task_outputs")

View File

@@ -2,7 +2,6 @@
from __future__ import annotations
from contextlib import AbstractContextManager
from datetime import datetime
import json
import logging
@@ -15,7 +14,6 @@ from typing import Any, ClassVar
import lancedb
from crewai.memory.types import MemoryRecord, ScopeInfo
from crewai.utilities.lock_store import lock as store_lock
_logger = logging.getLogger(__name__)
@@ -92,7 +90,6 @@ class LanceDBStorage:
# Raise it proactively so scans on large tables never hit OS error 24.
try:
import resource
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
if soft < 4096:
resource.setrlimit(resource.RLIMIT_NOFILE, (min(hard, 4096), hard))
@@ -102,8 +99,7 @@ class LanceDBStorage:
self._compact_every = compact_every
self._save_count = 0
self._lock_name = f"lancedb:{self._path.resolve()}"
# Get or create a shared write lock for this database path.
resolved = str(self._path.resolve())
with LanceDBStorage._path_locks_guard:
if resolved not in LanceDBStorage._path_locks:
@@ -114,13 +110,10 @@ class LanceDBStorage:
# If no table exists yet, defer creation until the first save so the
# dimension can be auto-detected from the embedder's actual output.
try:
self._table: lancedb.table.Table | None = self._db.open_table(
self._table_name
)
self._table: lancedb.table.Table | None = self._db.open_table(self._table_name)
self._vector_dim: int = self._infer_dim_from_table(self._table)
# Best-effort: create the scope index if it doesn't exist yet.
with self._file_lock():
self._ensure_scope_index()
self._ensure_scope_index()
# Compact in the background if the table has accumulated many
# fragments from previous runs (each save() creates one).
self._compact_if_needed()
@@ -131,8 +124,7 @@ class LanceDBStorage:
# Explicit dim provided: create the table immediately if it doesn't exist.
if self._table is None and vector_dim is not None:
self._vector_dim = vector_dim
with self._file_lock():
self._table = self._create_table(vector_dim)
self._table = self._create_table(vector_dim)
@property
def write_lock(self) -> threading.RLock:
@@ -157,14 +149,18 @@ class LanceDBStorage:
break
return DEFAULT_VECTOR_DIM
def _file_lock(self) -> AbstractContextManager[None]:
"""Return a cross-process lock for serialising writes."""
return store_lock(self._lock_name)
def _retry_write(self, op: str, *args: Any, **kwargs: Any) -> Any:
"""Execute a table operation with retry on LanceDB commit conflicts.
def _do_write(self, op: str, *args: Any, **kwargs: Any) -> Any:
"""Execute a single table write with retry on commit conflicts.
Args:
op: Method name on the table object (e.g. "add", "delete").
*args, **kwargs: Passed to the table method.
Caller must already hold the cross-process file lock.
LanceDB uses optimistic concurrency: if two transactions overlap,
the second to commit fails with an ``OSError`` containing
"Commit conflict". This helper retries with exponential backoff,
refreshing the table reference before each retry so the retried
call uses the latest committed version (not a stale reference).
"""
delay = _RETRY_BASE_DELAY
for attempt in range(_MAX_RETRIES + 1):
@@ -175,24 +171,20 @@ class LanceDBStorage:
raise
_logger.debug(
"LanceDB commit conflict on %s (attempt %d/%d), retrying in %.1fs",
op,
attempt + 1,
_MAX_RETRIES,
delay,
op, attempt + 1, _MAX_RETRIES, delay,
)
# Refresh table to pick up the latest version before retrying.
# The next getattr(self._table, op) will use the fresh table.
try:
self._table = self._db.open_table(self._table_name)
except Exception: # noqa: S110
pass
pass # table refresh is best-effort
time.sleep(delay)
delay *= 2
return None # unreachable, but satisfies type checker
def _create_table(self, vector_dim: int) -> lancedb.table.Table:
"""Create a new table with the given vector dimension.
Caller must already hold the cross-process file lock.
"""
"""Create a new table with the given vector dimension."""
placeholder = [
{
"id": "__schema_placeholder__",
@@ -208,12 +200,8 @@ class LanceDBStorage:
"vector": [0.0] * vector_dim,
}
]
try:
table = self._db.create_table(self._table_name, placeholder)
except ValueError:
table = self._db.open_table(self._table_name)
else:
table.delete("id = '__schema_placeholder__'")
table = self._db.create_table(self._table_name, placeholder)
table.delete("id = '__schema_placeholder__'")
return table
def _ensure_scope_index(self) -> None:
@@ -260,9 +248,9 @@ class LanceDBStorage:
"""Run ``table.optimize()`` in a background thread, absorbing errors."""
try:
if self._table is not None:
with self._file_lock():
self._table.optimize()
self._ensure_scope_index()
self._table.optimize()
# Refresh the scope index so new fragments are covered.
self._ensure_scope_index()
except Exception:
_logger.debug("LanceDB background compaction failed", exc_info=True)
@@ -292,9 +280,7 @@ class LanceDBStorage:
"last_accessed": record.last_accessed.isoformat(),
"source": record.source or "",
"private": record.private,
"vector": record.embedding
if record.embedding
else [0.0] * self._vector_dim,
"vector": record.embedding if record.embedding else [0.0] * self._vector_dim,
}
def _row_to_record(self, row: dict[str, Any]) -> MemoryRecord:
@@ -310,9 +296,7 @@ class LanceDBStorage:
id=str(row["id"]),
content=str(row["content"]),
scope=str(row["scope"]),
categories=json.loads(row["categories_str"])
if row.get("categories_str")
else [],
categories=json.loads(row["categories_str"]) if row.get("categories_str") else [],
metadata=json.loads(row["metadata_str"]) if row.get("metadata_str") else {},
importance=float(row.get("importance", 0.5)),
created_at=_parse_dt(row.get("created_at")),
@@ -332,15 +316,16 @@ class LanceDBStorage:
dim = len(r.embedding)
break
is_new_table = self._table is None
with self._write_lock, self._file_lock():
with self._write_lock:
self._ensure_table(vector_dim=dim)
rows = [self._record_to_row(r) for r in records]
for r in rows:
if r["vector"] is None or len(r["vector"]) != self._vector_dim:
r["vector"] = [0.0] * self._vector_dim
self._do_write("add", rows)
if is_new_table:
self._ensure_scope_index()
self._retry_write("add", rows)
# Create the scope index on the first save so it covers the initial dataset.
if is_new_table:
self._ensure_scope_index()
# Auto-compact every N saves so fragment files don't pile up.
self._save_count += 1
if self._compact_every > 0 and self._save_count % self._compact_every == 0:
@@ -348,14 +333,14 @@ class LanceDBStorage:
def update(self, record: MemoryRecord) -> None:
"""Update a record by ID. Preserves created_at, updates last_accessed."""
with self._write_lock, self._file_lock():
with self._write_lock:
self._ensure_table()
safe_id = str(record.id).replace("'", "''")
self._do_write("delete", f"id = '{safe_id}'")
self._retry_write("delete", f"id = '{safe_id}'")
row = self._record_to_row(record)
if row["vector"] is None or len(row["vector"]) != self._vector_dim:
row["vector"] = [0.0] * self._vector_dim
self._do_write("add", [row])
self._retry_write("add", [row])
def touch_records(self, record_ids: list[str]) -> None:
"""Update last_accessed to now for the given record IDs.
@@ -369,11 +354,11 @@ class LanceDBStorage:
"""
if not record_ids or self._table is None:
return
with self._write_lock, self._file_lock():
with self._write_lock:
now = datetime.utcnow().isoformat()
safe_ids = [str(rid).replace("'", "''") for rid in record_ids]
ids_expr = ", ".join(f"'{rid}'" for rid in safe_ids)
self._do_write(
self._retry_write(
"update",
where=f"id IN ({ids_expr})",
values={"last_accessed": now},
@@ -405,17 +390,13 @@ class LanceDBStorage:
prefix = scope_prefix.rstrip("/")
like_val = prefix + "%"
query = query.where(f"scope LIKE '{like_val}'")
results = query.limit(
limit * 3 if (categories or metadata_filter) else limit
).to_list()
results = query.limit(limit * 3 if (categories or metadata_filter) else limit).to_list()
out: list[tuple[MemoryRecord, float]] = []
for row in results:
record = self._row_to_record(row)
if categories and not any(c in record.categories for c in categories):
continue
if metadata_filter and not all(
record.metadata.get(k) == v for k, v in metadata_filter.items()
):
if metadata_filter and not all(record.metadata.get(k) == v for k, v in metadata_filter.items()):
continue
distance = row.get("_distance", 0.0)
score = 1.0 / (1.0 + float(distance)) if distance is not None else 1.0
@@ -435,24 +416,20 @@ class LanceDBStorage:
) -> int:
if self._table is None:
return 0
with self._write_lock, self._file_lock():
with self._write_lock:
if record_ids and not (categories or metadata_filter):
before = self._table.count_rows()
ids_expr = ", ".join(f"'{rid}'" for rid in record_ids)
self._do_write("delete", f"id IN ({ids_expr})")
self._retry_write("delete", f"id IN ({ids_expr})")
return before - self._table.count_rows()
if categories or metadata_filter:
rows = self._scan_rows(scope_prefix)
to_delete: list[str] = []
for row in rows:
record = self._row_to_record(row)
if categories and not any(
c in record.categories for c in categories
):
if categories and not any(c in record.categories for c in categories):
continue
if metadata_filter and not all(
record.metadata.get(k) == v for k, v in metadata_filter.items()
):
if metadata_filter and not all(record.metadata.get(k) == v for k, v in metadata_filter.items()):
continue
if older_than and record.created_at >= older_than:
continue
@@ -461,7 +438,7 @@ class LanceDBStorage:
return 0
before = self._table.count_rows()
ids_expr = ", ".join(f"'{rid}'" for rid in to_delete)
self._do_write("delete", f"id IN ({ids_expr})")
self._retry_write("delete", f"id IN ({ids_expr})")
return before - self._table.count_rows()
conditions = []
if scope_prefix is not None and scope_prefix.strip("/"):
@@ -473,11 +450,11 @@ class LanceDBStorage:
conditions.append(f"created_at < '{older_than.isoformat()}'")
if not conditions:
before = self._table.count_rows()
self._do_write("delete", "id != ''")
self._retry_write("delete", "id != ''")
return before - self._table.count_rows()
where_expr = " AND ".join(conditions)
before = self._table.count_rows()
self._do_write("delete", where_expr)
self._retry_write("delete", where_expr)
return before - self._table.count_rows()
def _scan_rows(
@@ -551,7 +528,7 @@ class LanceDBStorage:
for row in rows:
sc = str(row.get("scope", ""))
if child_prefix and sc.startswith(child_prefix):
rest = sc[len(child_prefix) :]
rest = sc[len(child_prefix):]
first_component = rest.split("/", 1)[0]
if first_component:
children.add(child_prefix + first_component)
@@ -562,11 +539,7 @@ class LanceDBStorage:
pass
created = row.get("created_at")
if created:
dt = (
datetime.fromisoformat(str(created).replace("Z", "+00:00"))
if isinstance(created, str)
else created
)
dt = datetime.fromisoformat(str(created).replace("Z", "+00:00")) if isinstance(created, str) else created
if isinstance(dt, datetime):
if oldest is None or dt < oldest:
oldest = dt
@@ -589,7 +562,7 @@ class LanceDBStorage:
for row in rows:
sc = str(row.get("scope", ""))
if sc.startswith(prefix) and sc != (prefix.rstrip("/") or "/"):
rest = sc[len(prefix) :]
rest = sc[len(prefix):]
first_component = rest.split("/", 1)[0]
if first_component:
children.add(prefix + first_component)
@@ -617,19 +590,17 @@ class LanceDBStorage:
return info.record_count
def reset(self, scope_prefix: str | None = None) -> None:
with self._write_lock, self._file_lock():
if scope_prefix is None or scope_prefix.strip("/") == "":
if self._table is not None:
self._db.drop_table(self._table_name)
self._table = None
return
if self._table is None:
return
prefix = scope_prefix.rstrip("/")
if prefix:
self._do_write(
"delete", f"scope >= '{prefix}' AND scope < '{prefix}/\uffff'"
)
if scope_prefix is None or scope_prefix.strip("/") == "":
if self._table is not None:
self._db.drop_table(self._table_name)
self._table = None
# Dimension is preserved; table will be recreated on next save.
return
if self._table is None:
return
prefix = scope_prefix.rstrip("/")
if prefix:
self._table.delete(f"scope >= '{prefix}' AND scope < '{prefix}/\uFFFF'")
def optimize(self) -> None:
"""Compact the table synchronously and refresh the scope index.
@@ -643,9 +614,8 @@ class LanceDBStorage:
"""
if self._table is None:
return
with self._write_lock, self._file_lock():
self._table.optimize()
self._ensure_scope_index()
self._table.optimize()
self._ensure_scope_index()
async def asave(self, records: list[MemoryRecord]) -> None:
self.save(records)

View File

@@ -1,12 +1,13 @@
"""Factory functions for creating ChromaDB clients."""
from hashlib import md5
import os
from chromadb import PersistentClient
import portalocker
from crewai.rag.chromadb.client import ChromaDBClient
from crewai.rag.chromadb.config import ChromaDBConfig
from crewai.utilities.lock_store import lock
def create_client(config: ChromaDBConfig) -> ChromaDBClient:
@@ -24,8 +25,10 @@ def create_client(config: ChromaDBConfig) -> ChromaDBClient:
persist_dir = config.settings.persist_directory
os.makedirs(persist_dir, exist_ok=True)
lock_id = md5(persist_dir.encode(), usedforsecurity=False).hexdigest()
lockfile = os.path.join(persist_dir, f"chromadb-{lock_id}.lock")
with lock(f"chromadb:{persist_dir}"):
with portalocker.Lock(lockfile):
client = PersistentClient(
path=persist_dir,
settings=config.settings,

View File

@@ -1,61 +0,0 @@
"""Centralised lock factory.
If ``REDIS_URL`` is set, locks are distributed via ``portalocker.RedisLock``. Otherwise, falls
back to the standard ``portalocker.Lock``.
"""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from functools import lru_cache
from hashlib import md5
import os
import tempfile
from typing import TYPE_CHECKING, Final
import portalocker
if TYPE_CHECKING:
import redis
_REDIS_URL: str | None = os.environ.get("REDIS_URL")
_DEFAULT_TIMEOUT: Final[int] = 120
@lru_cache(maxsize=1)
def _redis_connection() -> redis.Redis:
"""Return a cached Redis connection, creating one on first call."""
from redis import Redis
if _REDIS_URL is None:
raise ValueError("REDIS_URL environment variable is not set")
return Redis.from_url(_REDIS_URL)
@contextmanager
def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]:
"""Acquire a named lock, yielding while it is held.
Args:
name: A human-readable lock name (e.g. ``"chromadb_init"``).
Automatically namespaced to avoid collisions.
timeout: Maximum seconds to wait for the lock before raising.
"""
channel = f"crewai:{md5(name.encode(), usedforsecurity=False).hexdigest()}"
if _REDIS_URL:
with portalocker.RedisLock(
channel=channel,
connection=_redis_connection(),
timeout=timeout,
):
yield
else:
lock_dir = tempfile.gettempdir()
lock_path = os.path.join(lock_dir, f"{channel}.lock")
with portalocker.Lock(lock_path, timeout=timeout):
yield

View File

@@ -973,7 +973,7 @@ def test_bedrock_groups_three_tool_results():
"""Consecutive tool results should be grouped into one Bedrock user message."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
messages = [
test_messages = [
{"role": "user", "content": "Use all three tools, then continue."},
{
"role": "assistant",
@@ -1010,7 +1010,9 @@ def test_bedrock_groups_three_tool_results():
{"role": "tool", "tool_call_id": "tool-3", "content": "AMZN up 1.2%"},
]
formatted_messages, system_message = llm._format_messages_for_converse(messages)
formatted_messages, system_message = llm._format_messages_for_converse(
test_messages
)
assert system_message is None
assert [message["role"] for message in formatted_messages] == [
@@ -1032,146 +1034,3 @@ def test_bedrock_groups_three_tool_results():
"AI news summary",
"AMZN up 1.2%",
]
def test_bedrock_parallel_tool_results_grouped():
"""Regression test for issue #4749.
When an assistant message contains multiple parallel tool calls,
Bedrock requires all corresponding tool results to be grouped
in a single user message. Previously each tool result was emitted
as a separate user message, causing:
ValidationException: Expected toolResult blocks at messages.2.content
"""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
messages = [
{"role": "user", "content": "Calculate 25 + 17 AND 10 * 5"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_add",
"type": "function",
"function": {"name": "add_tool", "arguments": '{"a": 25, "b": 17}'},
},
{
"id": "call_mul",
"type": "function",
"function": {"name": "multiply_tool", "arguments": '{"a": 10, "b": 5}'},
},
],
},
{"role": "tool", "tool_call_id": "call_add", "content": "42"},
{"role": "tool", "tool_call_id": "call_mul", "content": "50"},
]
converse_msgs, system_msg = llm._format_messages_for_converse(messages)
# Find the user message that contains toolResult blocks
tool_result_messages = [
m for m in converse_msgs
if m.get("role") == "user"
and any("toolResult" in b for b in m.get("content", []))
]
# There must be exactly ONE user message with tool results (not two)
assert len(tool_result_messages) == 1, (
f"Expected 1 grouped tool-result message, got {len(tool_result_messages)}. "
"Bedrock requires all parallel tool results in a single user message."
)
# That single message must contain both tool results
tool_results = tool_result_messages[0]["content"]
assert len(tool_results) == 2, (
f"Expected 2 toolResult blocks in grouped message, got {len(tool_results)}"
)
# Verify the tool use IDs match
tool_use_ids = {
block["toolResult"]["toolUseId"] for block in tool_results
}
assert tool_use_ids == {"call_add", "call_mul"}
def test_bedrock_single_tool_result_still_works():
"""Ensure single tool call still produces a single-block user message."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
messages = [
{"role": "user", "content": "Add 1 + 2"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_single",
"type": "function",
"function": {"name": "add_tool", "arguments": '{"a": 1, "b": 2}'},
},
],
},
{"role": "tool", "tool_call_id": "call_single", "content": "3"},
]
converse_msgs, _ = llm._format_messages_for_converse(messages)
tool_result_messages = [
m for m in converse_msgs
if m.get("role") == "user"
and any("toolResult" in b for b in m.get("content", []))
]
assert len(tool_result_messages) == 1
assert len(tool_result_messages[0]["content"]) == 1
assert tool_result_messages[0]["content"][0]["toolResult"]["toolUseId"] == "call_single"
def test_bedrock_tool_results_not_merged_across_assistant_messages():
"""Tool results from different assistant turns must NOT be merged."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
messages = [
{"role": "user", "content": "First task"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_a",
"type": "function",
"function": {"name": "tool_a", "arguments": "{}"},
},
],
},
{"role": "tool", "tool_call_id": "call_a", "content": "result_a"},
{"role": "assistant", "content": "Now doing second task"},
{"role": "user", "content": "Second task"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_b",
"type": "function",
"function": {"name": "tool_b", "arguments": "{}"},
},
],
},
{"role": "tool", "tool_call_id": "call_b", "content": "result_b"},
]
converse_msgs, _ = llm._format_messages_for_converse(messages)
tool_result_messages = [
m for m in converse_msgs
if m.get("role") == "user"
and any("toolResult" in b for b in m.get("content", []))
]
# Two separate tool-result messages (one per assistant turn)
assert len(tool_result_messages) == 2, (
"Tool results from different assistant turns must remain separate"
)
assert tool_result_messages[0]["content"][0]["toolResult"]["toolUseId"] == "call_a"
assert tool_result_messages[1]["content"][0]["toolResult"]["toolUseId"] == "call_b"

View File

@@ -1,13 +0,0 @@
"""Stress tests for concurrent multi-process storage access.
Simulates the Airflow pattern: N worker processes each writing to the
same storage directory simultaneously. Verifies no LockException and
data integrity after all writes complete.
Uses temp files for IPC instead of multiprocessing.Manager (which uses
sockets blocked by pytest_recording).
"""
import pytest
pytestmark = pytest.mark.skip(reason="Multiprocessing tests incompatible with xdist --import-mode=importlib")

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.10.2a1"
__version__ = "1.10.1"