Compare commits

...

18 Commits

Author SHA1 Message Date
Thiago Moretto
190430da08 fix: address bot review feedback on tool metadata
- Use `is not None` instead of truthiness check so empty tools list
  is sent to the API rather than being silently dropped as None
- Strip __init__ suffix from module path for tools in __init__.py files
- Extend _unwrap_schema to handle function-before, function-wrap, and
  definitions wrapper types
2026-03-26 11:27:38 -03:00
Thiago Moretto
c9e8e05cd7 fix: resolve mypy type errors in utils.py 2026-03-26 09:30:50 -03:00
Thiago Moretto
9b536340c7 style: fix ruff formatting 2026-03-25 20:25:24 -03:00
Thiago Moretto
3002211a07 fix: Use sha256 instead of md5 for module name hashing (lint S324)
- Add missing mocks to metadata extraction failure test
2026-03-25 17:27:21 -03:00
Thiago Moretto
c54258d40b fix: address PR review feedback for tool metadata extraction
- Use sha256 instead of md5 for module name hashing (lint S324)
- Filter required list to match filtered properties in JSON schema
2026-03-25 17:27:21 -03:00
Thiago Moretto
874db834f0 Extract module name 2026-03-25 17:21:47 -03:00
Thiago Moretto
77549cfcdf Priting out detected tools 2026-03-25 17:21:47 -03:00
Thiago Moretto
8138ba491f Remove debug message + code simplification 2026-03-25 17:21:47 -03:00
Thiago Moretto
d4400e19f0 Fix payload (nest under tools key) 2026-03-25 17:21:47 -03:00
Thiago Moretto
a9c272c687 Exporting tool's metadata to AMP - initial work 2026-03-25 17:21:47 -03:00
Greyson LaLonde
b5a0d6e709 docs: update changelog and version for v1.12.0a3 2026-03-26 04:17:37 +08:00
Greyson LaLonde
454156cff9 feat: bump versions to 1.12.0a3 2026-03-26 04:12:49 +08:00
Tiago Freire
d86707da3d Fix: bad credentials for traces batch push (404) (#4947)
## Summary

### Core fixes

<details>
<summary><b>Fix silent 404 cascade on trace event send</b></summary>

When `_initialize_backend_batch` failed, `trace_batch_id` was left populated with a client-generated UUID never registered server-side. All subsequent event sends hit a non-existent batch endpoint and returned 404. Now all three failure paths (None response, non-2xx status, exception) clear `trace_batch_id`.
</details>

<details>
<summary><b>Fix first-time deferred batch init silently skipped</b></summary>

First-time users have `is_tracing_enabled_in_context() = False` by design. This caused `_initialize_backend_batch` to return early without creating the batch, and `finalize_batch` to skip finalization (same guard). The first-time handler now passes `skip_context_check=True` to bypass both guards, calls `_finalize_backend_batch` directly, gates `backend_initialized` on actual success, checks `_send_events_to_backend` return status (marking batch as failed on 500), captures event count/duration/batch ID before they're consumed by send/finalize, and cleans up all singleton state via `_reset_batch_state()` on every exit path.
</details>

<details>
<summary><b>Sync <code>is_current_batch_ephemeral</code> on batch creation success</b></summary>

When the batch is successfully created on the server, `is_current_batch_ephemeral` is now synced with the actual `use_ephemeral` value used. This prevents endpoint mismatches where the batch was created on one endpoint but events and finalization were sent to a different one, resulting in 404.
</details>

<details>
<summary><b>Route <code>mark_trace_batch_as_failed</code> to correct endpoint for ephemeral batches</b></summary>

`mark_trace_batch_as_failed` always routed to the non-ephemeral endpoint (`/tracing/batches/{id}`), causing 404s when called on ephemeral batches — the same class of endpoint mismatch this PR aims to fix. Added `mark_ephemeral_trace_batch_as_failed` to `PlusAPI` and a `_mark_batch_as_failed` helper on `TraceBatchManager` that routes based on `is_current_batch_ephemeral`.
</details>

<details>
<summary><b>Gate <code>backend_initialized</code> on actual init success (non-first-time path)</b></summary>

On the non-first-time path, `backend_initialized` was set to `True` unconditionally after `_initialize_backend_batch` returned. With the new failure-path cleanup that clears `trace_batch_id`, this created an inconsistent state: `backend_initialized=True` + `trace_batch_id=None`. Now set via `self.trace_batch_id is not None`.
</details>

### Resilience improvements

<details>
<summary><b>Retry transient failures on batch creation</b></summary>

`_initialize_backend_batch` now retries up to 2 times with 200ms backoff on transient failures (None response, 5xx, network errors). Non-transient 4xx errors are not retried. The short backoff minimizes lock hold time on the non-first-time path where `_batch_ready_cv` is held.
</details>

<details>
<summary><b>Fall back to ephemeral on server auth rejection</b></summary>

When the non-ephemeral endpoint returns 401/403 (expired token, revoked credentials, key rotation), the client automatically switches to ephemeral tracing instead of losing traces. The fallback forwards `skip_context_check` and is guarded against infinite recursion — if ephemeral also fails, `trace_batch_id` is cleared normally.
</details>

<details>
<summary><b>Fix action-event race initializing batch as non-ephemeral</b></summary>

`_handle_action_event` called `batch_manager.initialize_batch()` directly, defaulting `use_ephemeral=False`. When a `DefaultEnvEvent` or `LLMCallStartedEvent` fired before `CrewKickoffStartedEvent` in the thread pool, the batch was locked in as non-ephemeral. Now routes through `_initialize_batch()` which computes `use_ephemeral` from `_check_authenticated()`.
</details>

<details>
<summary><b>Guard <code>_mark_batch_as_failed</code> against cascading network errors</b></summary>

When `_finalize_backend_batch` failed with a network error (e.g. `[Errno 54] Connection reset by peer`), the exception handler called `_mark_batch_as_failed` — which also makes an HTTP request on the same dead connection. That second failure was unhandled. Now wrapped in a try/except so it logs at debug level instead of propagating.
</details>

<details>
<summary><b>Design decision: first-time users always use ephemeral</b></summary>

First-time trace collection **always creates ephemeral batches**, regardless of authentication status. This is intentional:

1. **The first-time handler UX is built around ephemeral traces** — it displays an access code, a 24-hour expiry link, and opens the browser to the ephemeral trace viewer. Non-ephemeral batches don't produce these artifacts, so the handler would fall through to the "Local Traces Collected" fallback even when traces were successfully sent.

2. **The server handles account linking automatically** — `LinkEphemeralTracesJob` runs on user signup and migrates ephemeral traces to permanent records. Logged-in users can access their traces via their dashboard regardless.

3. **Checking auth during batch setup broke event collection** — moving `_check_authenticated()` into `_initialize_batch` caused the batch initialization to fail silently during the flow/crew start event handler, preventing all event collection. Keeping the first-time path fast and side-effect-free preserves event collection.

The auth check is deferred to the non-first-time path (second run onwards), where `is_tracing_enabled_in_context()` is `True` and the normal tracing pipeline handles everything — including the 401/403 ephemeral fallback.
</details>


### Manual tests


<details>
<summary><b>Matrix</b></summary>

| Scenario | First run | Second run |
|----------|-----------|------------|
| Logged out, fresh `.crewai_user.json` | Ephemeral trace created, URL returned | Ephemeral trace created, URL returned |
| Logged in, fresh `.crewai_user.json` | Ephemeral trace created, URL returned | Trace batch finalized, URL returned |
| Flow execution | Tested with `poem_flow` | Tested with `poem_flow` |
| Crew execution | Tested with `hitl_crew` | Tested with `hitl_crew` |
</details>
2026-03-25 16:00:05 -04:00
Greyson LaLonde
1956471086 fix: resolve multiple bugs in HITL flow system 2026-03-26 03:33:03 +08:00
Greyson LaLonde
4d1c041cc1 docs: update changelog and version for v1.12.0a2 2026-03-25 23:54:52 +08:00
Greyson LaLonde
2267b96e89 feat: bump versions to 1.12.0a2 2026-03-25 23:49:12 +08:00
Greyson LaLonde
1cc251b4b8 feat: add Qdrant Edge storage backend for memory system 2026-03-25 23:42:09 +08:00
Greyson LaLonde
90caa62158 chore: run ruff check and format on all files in CI
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
2026-03-25 20:55:03 +08:00
34 changed files with 3153 additions and 142 deletions

View File

@@ -8,15 +8,8 @@ permissions:
jobs:
lint:
runs-on: ubuntu-latest
env:
TARGET_BRANCH: ${{ github.event.pull_request.base.ref }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Fetch Target Branch
run: git fetch origin $TARGET_BRANCH --depth=1
- name: Restore global uv cache
id: cache-restore
@@ -40,23 +33,11 @@ jobs:
- name: Install dependencies
run: uv sync --all-groups --all-extras --no-install-project
- name: Get Changed Python Files
id: changed-files
run: |
merge_base=$(git merge-base origin/"$TARGET_BRANCH" HEAD)
changed_files=$(git diff --name-only --diff-filter=ACMRTUB "$merge_base" | grep '\.py$' || true)
echo "files<<EOF" >> $GITHUB_OUTPUT
echo "$changed_files" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
- name: Ruff check
run: uv run ruff check lib/
- name: Run Ruff on Changed Files
if: ${{ steps.changed-files.outputs.files != '' }}
run: |
echo "${{ steps.changed-files.outputs.files }}" \
| tr ' ' '\n' \
| grep -v 'src/crewai/cli/templates/' \
| grep -v '/tests/' \
| xargs -I{} uv run ruff check "{}"
- name: Ruff format
run: uv run ruff format --check lib/
- name: Save uv caches
if: steps.cache-restore.outputs.cache-hit != 'true'

View File

@@ -4,6 +4,45 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="26 مارس 2026">
## v1.12.0a3
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
## ما الذي تغير
### إصلاحات الأخطاء
- إصلاح بيانات الاعتماد الخاطئة لدفع دفعات التتبع (404)
- حل العديد من الأخطاء في نظام تدفق HITL
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.12.0a2
## المساهمون
@akaKuruma, @greysonlalonde
</Update>
<Update label="25 مارس 2026">
## v1.12.0a2
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a2)
## ما الذي تغير
### الميزات
- إضافة واجهة تخزين Qdrant Edge لنظام الذاكرة
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.12.0a1
## المساهمون
@greysonlalonde
</Update>
<Update label="25 مارس 2026">
## v1.12.0a1

View File

@@ -4,6 +4,45 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Mar 26, 2026">
## v1.12.0a3
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
## What's Changed
### Bug Fixes
- Fix bad credentials for traces batch push (404)
- Resolve multiple bugs in HITL flow system
### Documentation
- Update changelog and version for v1.12.0a2
## Contributors
@akaKuruma, @greysonlalonde
</Update>
<Update label="Mar 25, 2026">
## v1.12.0a2
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a2)
## What's Changed
### Features
- Add Qdrant Edge storage backend for memory system
### Documentation
- Update changelog and version for v1.12.0a1
## Contributors
@greysonlalonde
</Update>
<Update label="Mar 25, 2026">
## v1.12.0a1

View File

@@ -4,6 +4,45 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 3월 26일">
## v1.12.0a3
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
## 변경 사항
### 버그 수정
- 트레이스 배치 푸시에 대한 잘못된 자격 증명 수정 (404)
- HITL 흐름 시스템의 여러 버그 해결
### 문서
- v1.12.0a2에 대한 변경 로그 및 버전 업데이트
## 기여자
@akaKuruma, @greysonlalonde
</Update>
<Update label="2026년 3월 25일">
## v1.12.0a2
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a2)
## 변경 사항
### 기능
- 메모리 시스템을 위한 Qdrant Edge 스토리지 백엔드 추가
### 문서
- v1.12.0a1에 대한 변경 로그 및 버전 업데이트
## 기여자
@greysonlalonde
</Update>
<Update label="2026년 3월 25일">
## v1.12.0a1

View File

@@ -4,6 +4,45 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="26 mar 2026">
## v1.12.0a3
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
## O que Mudou
### Correções de Bugs
- Corrigir credenciais inválidas para envio em lote de rastros (404)
- Resolver múltiplos bugs no sistema de fluxo HITL
### Documentação
- Atualizar changelog e versão para v1.12.0a2
## Contributors
@akaKuruma, @greysonlalonde
</Update>
<Update label="25 mar 2026">
## v1.12.0a2
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a2)
## O que Mudou
### Recursos
- Adicionar backend de armazenamento Qdrant Edge para sistema de memória
### Documentação
- Atualizar changelog e versão para v1.12.0a1
## Contribuidores
@greysonlalonde
</Update>
<Update label="25 mar 2026">
## v1.12.0a1

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.12.0a1"
__version__ = "1.12.0a3"

View File

@@ -11,7 +11,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.12.0a1",
"crewai==1.12.0a3",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -309,4 +309,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.12.0a1"
__version__ = "1.12.0a3"

View File

@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.12.0a1",
"crewai-tools==1.12.0a3",
]
embeddings = [
"tiktoken~=0.8.0"
@@ -106,6 +106,9 @@ a2a = [
file-processing = [
"crewai-files",
]
qdrant-edge = [
"qdrant-edge-py>=0.6.0",
]
[project.scripts]

View File

@@ -42,7 +42,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.12.0a1"
__version__ = "1.12.0a3"
_telemetry_submitted = False

View File

@@ -73,6 +73,7 @@ class PlusAPI:
description: str | None,
encoded_file: str,
available_exports: list[dict[str, Any]] | None = None,
tools_metadata: list[dict[str, Any]] | None = None,
) -> httpx.Response:
params = {
"handle": handle,
@@ -81,6 +82,9 @@ class PlusAPI:
"file": encoded_file,
"description": description,
"available_exports": available_exports,
"tools_metadata": {"package": handle, "tools": tools_metadata}
if tools_metadata is not None
else None,
}
return self._make_request("POST", f"{self.TOOLS_RESOURCE}", json=params)
@@ -196,6 +200,16 @@ class PlusAPI:
timeout=30,
)
def mark_ephemeral_trace_batch_as_failed(
self, trace_batch_id: str, error_message: str
) -> httpx.Response:
return self._make_request(
"PATCH",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}",
json={"status": "failed", "failure_reason": error_message},
timeout=30,
)
def get_mcp_configs(self, slugs: list[str]) -> httpx.Response:
"""Get MCP server configurations for the given slugs."""
return self._make_request(

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.12.0a1"
"crewai[tools]==1.12.0a3"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.12.0a1"
"crewai[tools]==1.12.0a3"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.12.0a1"
"crewai[tools]==1.12.0a3"
]
[tool.crewai]

View File

@@ -17,6 +17,7 @@ from crewai.cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
from crewai.cli.utils import (
build_env_with_tool_repository_credentials,
extract_available_exports,
extract_tools_metadata,
get_project_description,
get_project_name,
get_project_version,
@@ -101,6 +102,18 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
console.print(
f"[green]Found these tools to publish: {', '.join([e['name'] for e in available_exports])}[/green]"
)
console.print("[bold blue]Extracting tool metadata...[/bold blue]")
try:
tools_metadata = extract_tools_metadata()
except Exception as e:
console.print(
f"[yellow]Warning: Could not extract tool metadata: {e}[/yellow]\n"
f"Publishing will continue without detailed metadata."
)
tools_metadata = []
self._print_tools_preview(tools_metadata)
self._print_current_organization()
with tempfile.TemporaryDirectory() as temp_build_dir:
@@ -118,7 +131,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
"Project build failed. Please ensure that the command `uv build --sdist` completes successfully.",
style="bold red",
)
raise SystemExit
raise SystemExit(1)
tarball_path = os.path.join(temp_build_dir, tarball_filename)
with open(tarball_path, "rb") as file:
@@ -134,6 +147,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
description=project_description,
encoded_file=f"data:application/x-gzip;base64,{encoded_tarball}",
available_exports=available_exports,
tools_metadata=tools_metadata,
)
self._validate_response(publish_response)
@@ -246,6 +260,55 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
)
raise SystemExit
def _print_tools_preview(self, tools_metadata: list[dict[str, Any]]) -> None:
if not tools_metadata:
console.print("[yellow]No tool metadata extracted.[/yellow]")
return
console.print(
f"\n[bold]Tools to be published ({len(tools_metadata)}):[/bold]\n"
)
for tool in tools_metadata:
console.print(f" [bold cyan]{tool.get('name', 'Unknown')}[/bold cyan]")
if tool.get("module"):
console.print(f" Module: {tool.get('module')}")
console.print(f" Name: {tool.get('humanized_name', 'N/A')}")
console.print(
f" Description: {tool.get('description', 'N/A')[:80]}{'...' if len(tool.get('description', '')) > 80 else ''}"
)
init_params = tool.get("init_params_schema", {}).get("properties", {})
if init_params:
required = tool.get("init_params_schema", {}).get("required", [])
console.print(" Init parameters:")
for param_name, param_info in init_params.items():
param_type = param_info.get("type", "any")
is_required = param_name in required
req_marker = "[red]*[/red]" if is_required else ""
default = (
f" = {param_info['default']}" if "default" in param_info else ""
)
console.print(
f" - {param_name}: {param_type}{default} {req_marker}"
)
env_vars = tool.get("env_vars", [])
if env_vars:
console.print(" Environment variables:")
for env_var in env_vars:
req_marker = "[red]*[/red]" if env_var.get("required") else ""
default = (
f" (default: {env_var['default']})"
if env_var.get("default")
else ""
)
console.print(
f" - {env_var['name']}: {env_var.get('description', 'N/A')}{default} {req_marker}"
)
console.print()
def _print_current_organization(self) -> None:
settings = Settings()
if settings.org_uuid:

View File

@@ -1,10 +1,15 @@
from functools import reduce
from collections.abc import Generator, Mapping
from contextlib import contextmanager
from functools import lru_cache, reduce
import hashlib
import importlib.util
import inspect
from inspect import getmro, isclass, isfunction, ismethod
import os
from pathlib import Path
import shutil
import sys
import types
from typing import Any, cast, get_type_hints
import click
@@ -544,43 +549,62 @@ def build_env_with_tool_repository_credentials(
return env
@contextmanager
def _load_module_from_file(
init_file: Path, module_name: str | None = None
) -> Generator[types.ModuleType | None, None, None]:
"""
Context manager for loading a module from file with automatic cleanup.
Yields the loaded module or None if loading fails.
"""
if module_name is None:
module_name = (
f"temp_module_{hashlib.sha256(str(init_file).encode()).hexdigest()[:8]}"
)
spec = importlib.util.spec_from_file_location(module_name, init_file)
if not spec or not spec.loader:
yield None
return
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
try:
spec.loader.exec_module(module)
yield module
finally:
sys.modules.pop(module_name, None)
def _load_tools_from_init(init_file: Path) -> list[dict[str, Any]]:
"""
Load and validate tools from a given __init__.py file.
"""
spec = importlib.util.spec_from_file_location("temp_module", init_file)
if not spec or not spec.loader:
return []
module = importlib.util.module_from_spec(spec)
sys.modules["temp_module"] = module
try:
spec.loader.exec_module(module)
with _load_module_from_file(init_file) as module:
if module is None:
return []
if not hasattr(module, "__all__"):
console.print(
f"Warning: No __all__ defined in {init_file}",
style="bold yellow",
)
raise SystemExit(1)
return [
{
"name": name,
}
for name in module.__all__
if hasattr(module, name) and is_valid_tool(getattr(module, name))
]
if not hasattr(module, "__all__"):
console.print(
f"Warning: No __all__ defined in {init_file}",
style="bold yellow",
)
raise SystemExit(1)
return [
{"name": name}
for name in module.__all__
if hasattr(module, name) and is_valid_tool(getattr(module, name))
]
except SystemExit:
raise
except Exception as e:
console.print(f"[red]Warning: Could not load {init_file}: {e!s}[/red]")
raise SystemExit(1) from e
finally:
sys.modules.pop("temp_module", None)
def _print_no_tools_warning() -> None:
"""
@@ -610,3 +634,229 @@ def _print_no_tools_warning() -> None:
" # ... implementation\n"
" return result\n"
)
def extract_tools_metadata(dir_path: str = "src") -> list[dict[str, Any]]:
"""
Extract rich metadata from tool classes in the project.
Returns a list of tool metadata dictionaries containing:
- name: Class name
- humanized_name: From name field default
- description: From description field default
- run_params_schema: JSON Schema for _run() params (from args_schema)
- init_params_schema: JSON Schema for __init__ params (filtered)
- env_vars: List of environment variable dicts
"""
tools_metadata: list[dict[str, Any]] = []
for init_file in Path(dir_path).glob("**/__init__.py"):
tools = _extract_tool_metadata_from_init(init_file)
tools_metadata.extend(tools)
return tools_metadata
def _extract_tool_metadata_from_init(init_file: Path) -> list[dict[str, Any]]:
"""
Load module from init file and extract metadata from valid tool classes.
"""
from crewai.tools.base_tool import BaseTool
try:
with _load_module_from_file(init_file) as module:
if module is None:
return []
exported_names = getattr(module, "__all__", None)
if not exported_names:
return []
tools_metadata = []
for name in exported_names:
obj = getattr(module, name, None)
if obj is None or not (
inspect.isclass(obj) and issubclass(obj, BaseTool)
):
continue
if tool_info := _extract_single_tool_metadata(obj):
tools_metadata.append(tool_info)
return tools_metadata
except Exception as e:
console.print(
f"[yellow]Warning: Could not extract metadata from {init_file}: {e}[/yellow]"
)
return []
def _extract_single_tool_metadata(tool_class: type) -> dict[str, Any] | None:
"""
Extract metadata from a single tool class.
"""
try:
core_schema = cast(Any, tool_class).__pydantic_core_schema__
if not core_schema:
return None
schema = _unwrap_schema(core_schema)
fields = schema.get("schema", {}).get("fields", {})
try:
file_path = inspect.getfile(tool_class)
relative_path = Path(file_path).relative_to(Path.cwd())
module_path = relative_path.with_suffix("")
if module_path.parts[0] == "src":
module_path = Path(*module_path.parts[1:])
if module_path.name == "__init__":
module_path = module_path.parent
module = ".".join(module_path.parts)
except (TypeError, ValueError):
module = tool_class.__module__
return {
"name": tool_class.__name__,
"module": module,
"humanized_name": _extract_field_default(
fields.get("name"), fallback=tool_class.__name__
),
"description": str(
_extract_field_default(fields.get("description"))
).strip(),
"run_params_schema": _extract_run_params_schema(fields.get("args_schema")),
"init_params_schema": _extract_init_params_schema(tool_class),
"env_vars": _extract_env_vars(fields.get("env_vars")),
}
except Exception:
return None
def _unwrap_schema(schema: Mapping[str, Any] | dict[str, Any]) -> dict[str, Any]:
"""
Unwrap nested schema structures to get to the actual schema definition.
"""
result: dict[str, Any] = dict(schema)
while (
result.get("type")
in {"function-after", "function-before", "function-wrap", "default"}
and "schema" in result
):
result = dict(result["schema"])
if result.get("type") == "definitions" and "schema" in result:
result = dict(result["schema"])
return result
def _extract_field_default(
field: dict[str, Any] | None, fallback: str | list[Any] = ""
) -> str | list[Any] | int:
"""
Extract the default value from a field schema.
"""
if not field:
return fallback
schema = field.get("schema", {})
default = schema.get("default")
return default if isinstance(default, (list, str, int)) else fallback
@lru_cache(maxsize=1)
def _get_schema_generator() -> type:
"""Get a SchemaGenerator that omits non-serializable defaults."""
from pydantic.json_schema import GenerateJsonSchema
from pydantic_core import PydanticOmit
class SchemaGenerator(GenerateJsonSchema):
def handle_invalid_for_json_schema(
self, schema: Any, error_info: Any
) -> dict[str, Any]:
raise PydanticOmit
return SchemaGenerator
def _extract_run_params_schema(
args_schema_field: dict[str, Any] | None,
) -> dict[str, Any]:
"""
Extract JSON Schema for the tool's run parameters from args_schema field.
"""
from pydantic import BaseModel
if not args_schema_field:
return {}
args_schema_class = args_schema_field.get("schema", {}).get("default")
if not (
inspect.isclass(args_schema_class) and issubclass(args_schema_class, BaseModel)
):
return {}
try:
return args_schema_class.model_json_schema(
schema_generator=_get_schema_generator()
)
except Exception:
return {}
_IGNORED_INIT_PARAMS = frozenset(
{
"name",
"description",
"env_vars",
"args_schema",
"description_updated",
"cache_function",
"result_as_answer",
"max_usage_count",
"current_usage_count",
"package_dependencies",
}
)
def _extract_init_params_schema(tool_class: type) -> dict[str, Any]:
"""
Extract JSON Schema for the tool's __init__ parameters, filtering out base fields.
"""
try:
json_schema: dict[str, Any] = cast(Any, tool_class).model_json_schema(
schema_generator=_get_schema_generator(), mode="serialization"
)
filtered_properties = {
key: value
for key, value in json_schema.get("properties", {}).items()
if key not in _IGNORED_INIT_PARAMS
}
json_schema["properties"] = filtered_properties
if "required" in json_schema:
json_schema["required"] = [
key for key in json_schema["required"] if key in filtered_properties
]
return json_schema
except Exception:
return {}
def _extract_env_vars(env_vars_field: dict[str, Any] | None) -> list[dict[str, Any]]:
"""
Extract environment variable definitions from env_vars field.
"""
from crewai.tools.base_tool import EnvVar
if not env_vars_field:
return []
return [
{
"name": env_var.name,
"description": env_var.description,
"required": env_var.required,
"default": env_var.default,
}
for env_var in env_vars_field.get("schema", {}).get("default", [])
if isinstance(env_var, EnvVar)
]

View File

@@ -1,3 +1,4 @@
from datetime import datetime, timezone
import logging
import uuid
import webbrowser
@@ -100,20 +101,50 @@ class FirstTimeTraceHandler:
user_context=user_context,
execution_metadata=execution_metadata,
use_ephemeral=True,
skip_context_check=True,
)
if not self.batch_manager.trace_batch_id:
self._gracefully_fail(
"Backend batch creation failed, cannot send events."
)
self._reset_batch_state()
return
self.batch_manager.backend_initialized = True
if self.batch_manager.event_buffer:
self.batch_manager._send_events_to_backend()
# Capture values before send/finalize consume them
events_count = len(self.batch_manager.event_buffer)
batch_id = self.batch_manager.trace_batch_id
# Read duration non-destructively — _finalize_backend_batch will consume it
start_time = self.batch_manager.execution_start_times.get("execution")
duration_ms = (
int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
if start_time
else 0
)
self.batch_manager.finalize_batch()
if self.batch_manager.event_buffer:
send_status = self.batch_manager._send_events_to_backend()
if send_status == 500 and self.batch_manager.trace_batch_id:
self.batch_manager._mark_batch_as_failed(
self.batch_manager.trace_batch_id,
"Error sending events to backend",
)
self._reset_batch_state()
return
self.batch_manager._finalize_backend_batch(events_count)
self.ephemeral_url = self.batch_manager.ephemeral_trace_url
if not self.ephemeral_url:
self._show_local_trace_message()
self._show_local_trace_message(events_count, duration_ms, batch_id)
self._reset_batch_state()
except Exception as e:
self._gracefully_fail(f"Backend initialization failed: {e}")
self._reset_batch_state()
def _display_ephemeral_trace_link(self) -> None:
"""Display the ephemeral trace link to the user and automatically open browser."""
@@ -185,6 +216,19 @@ To enable tracing later, do any one of these:
console.print(panel)
console.print()
def _reset_batch_state(self) -> None:
"""Reset batch manager state to allow future executions to re-initialize."""
if not self.batch_manager:
return
self.batch_manager.batch_owner_type = None
self.batch_manager.batch_owner_id = None
self.batch_manager.current_batch = None
self.batch_manager.event_buffer.clear()
self.batch_manager.trace_batch_id = None
self.batch_manager.is_current_batch_ephemeral = False
self.batch_manager.backend_initialized = False
self.batch_manager._cleanup_batch_data()
def _gracefully_fail(self, error_message: str) -> None:
"""Handle errors gracefully without disrupting user experience."""
console = Console()
@@ -192,7 +236,9 @@ To enable tracing later, do any one of these:
logger.debug(f"First-time trace error: {error_message}")
def _show_local_trace_message(self) -> None:
def _show_local_trace_message(
self, events_count: int = 0, duration_ms: int = 0, batch_id: str | None = None
) -> None:
"""Show message when traces were collected locally but couldn't be uploaded."""
if self.batch_manager is None:
return
@@ -203,9 +249,9 @@ To enable tracing later, do any one of these:
📊 Your execution traces were collected locally!
Unfortunately, we couldn't upload them to the server right now, but here's what we captured:
{len(self.batch_manager.event_buffer)} trace events
• Execution duration: {self.batch_manager.calculate_duration("execution")}ms
• Batch ID: {self.batch_manager.trace_batch_id}
{events_count} trace events
• Execution duration: {duration_ms}ms
• Batch ID: {batch_id}
✅ Tracing has been enabled for future runs!
Your preference has been saved. Future Crew/Flow executions will automatically collect traces.

View File

@@ -2,6 +2,7 @@ from dataclasses import dataclass, field
from datetime import datetime, timezone
from logging import getLogger
from threading import Condition, Lock
import time
from typing import Any
import uuid
@@ -98,7 +99,7 @@ class TraceBatchManager:
self._initialize_backend_batch(
user_context, execution_metadata, use_ephemeral
)
self.backend_initialized = True
self.backend_initialized = self.trace_batch_id is not None
self._batch_ready_cv.notify_all()
return self.current_batch
@@ -108,14 +109,15 @@ class TraceBatchManager:
user_context: dict[str, str],
execution_metadata: dict[str, Any],
use_ephemeral: bool = False,
skip_context_check: bool = False,
) -> None:
"""Send batch initialization to backend"""
if not is_tracing_enabled_in_context():
return
if not skip_context_check and not is_tracing_enabled_in_context():
return None
if not self.plus_api or not self.current_batch:
return
return None
try:
payload = {
@@ -142,19 +144,53 @@ class TraceBatchManager:
payload["ephemeral_trace_id"] = self.current_batch.batch_id
payload["user_identifier"] = get_user_id()
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
max_retries = 1
response = None
try:
for attempt in range(max_retries + 1):
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
if response is not None and response.status_code < 500:
break
if attempt < max_retries:
logger.debug(
f"Trace batch init attempt {attempt + 1} failed "
f"(status={response.status_code if response else 'None'}), retrying..."
)
time.sleep(0.2)
except Exception as e:
logger.warning(
f"Error initializing trace batch: {e}. Continuing without tracing."
)
self.trace_batch_id = None
return None
if response is None:
logger.warning(
"Trace batch initialization failed gracefully. Continuing without tracing."
)
return
self.trace_batch_id = None
return None
# Fall back to ephemeral on auth failure (expired/revoked token)
if response.status_code in [401, 403] and not use_ephemeral:
logger.warning(
"Auth rejected by server, falling back to ephemeral tracing."
)
self.is_current_batch_ephemeral = True
return self._initialize_backend_batch(
user_context,
execution_metadata,
use_ephemeral=True,
skip_context_check=skip_context_check,
)
if response.status_code in [201, 200]:
self.is_current_batch_ephemeral = use_ephemeral
response_data = response.json()
self.trace_batch_id = (
response_data["trace_id"]
@@ -165,11 +201,22 @@ class TraceBatchManager:
logger.warning(
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
)
self.trace_batch_id = None
except Exception as e:
logger.warning(
f"Error initializing trace batch: {e}. Continuing without tracing."
)
self.trace_batch_id = None
def _mark_batch_as_failed(self, trace_batch_id: str, error_message: str) -> None:
"""Mark a trace batch as failed, routing to the correct endpoint."""
if self.is_current_batch_ephemeral:
self.plus_api.mark_ephemeral_trace_batch_as_failed(
trace_batch_id, error_message
)
else:
self.plus_api.mark_trace_batch_as_failed(trace_batch_id, error_message)
def begin_event_processing(self) -> None:
"""Mark that an event handler started processing (for synchronization)."""
@@ -260,7 +307,7 @@ class TraceBatchManager:
logger.error(
"Event handler timeout - marking batch as failed due to incomplete events"
)
self.plus_api.mark_trace_batch_as_failed(
self._mark_batch_as_failed(
self.trace_batch_id,
"Timeout waiting for event handlers - events incomplete",
)
@@ -284,7 +331,7 @@ class TraceBatchManager:
events_sent_to_backend_status = self._send_events_to_backend()
self.event_buffer = original_buffer
if events_sent_to_backend_status == 500 and self.trace_batch_id:
self.plus_api.mark_trace_batch_as_failed(
self._mark_batch_as_failed(
self.trace_batch_id, "Error sending events to backend"
)
return None
@@ -364,13 +411,16 @@ class TraceBatchManager:
logger.error(
f"❌ Failed to finalize trace batch: {response.status_code} - {response.text}"
)
self.plus_api.mark_trace_batch_as_failed(
self.trace_batch_id, response.text
)
self._mark_batch_as_failed(self.trace_batch_id, response.text)
except Exception as e:
logger.error(f"❌ Error finalizing trace batch: {e}")
self.plus_api.mark_trace_batch_as_failed(self.trace_batch_id, str(e))
try:
self._mark_batch_as_failed(self.trace_batch_id, str(e))
except Exception:
logger.debug(
"Could not mark trace batch as failed (network unavailable)"
)
def _cleanup_batch_data(self) -> None:
"""Clean up batch data after successful finalization to free memory"""

View File

@@ -235,8 +235,11 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(FlowStartedEvent)
def on_flow_started(source: Any, event: FlowStartedEvent) -> None:
if not self.batch_manager.is_batch_initialized():
self._initialize_flow_batch(source, event)
# Always call _initialize_flow_batch to claim ownership.
# If batch was already initialized by a concurrent action event
# (race condition), initialize_batch() returns early but
# batch_owner_type is still correctly set to "flow".
self._initialize_flow_batch(source, event)
self._handle_trace_event("flow_started", source, event)
@event_bus.on(MethodExecutionStartedEvent)
@@ -266,7 +269,12 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None:
if not self.batch_manager.is_batch_initialized():
if self.batch_manager.batch_owner_type != "flow":
# Always call _initialize_crew_batch to claim ownership.
# If batch was already initialized by a concurrent action event
# (race condition with DefaultEnvEvent), initialize_batch() returns
# early but batch_owner_type is still correctly set to "crew".
# Skip only when a parent flow already owns the batch.
self._initialize_crew_batch(source, event)
self._handle_trace_event("crew_kickoff_started", source, event)
@@ -772,7 +780,7 @@ class TraceCollectionListener(BaseEventListener):
"crew_name": getattr(source, "name", "Unknown Crew"),
"crewai_version": get_crewai_version(),
}
self.batch_manager.initialize_batch(user_context, execution_metadata)
self._initialize_batch(user_context, execution_metadata)
self.batch_manager.begin_event_processing()
try:

View File

@@ -127,6 +127,9 @@ To update, run: uv sync --upgrade-package crewai"""
def _show_tracing_disabled_message_if_needed(self) -> None:
"""Show tracing disabled message if tracing is not enabled."""
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.events.listeners.tracing.utils import (
has_user_declined_tracing,
is_tracing_enabled_in_context,
@@ -136,6 +139,12 @@ To update, run: uv sync --upgrade-package crewai"""
if should_suppress_tracing_messages():
return
# Don't show "disabled" message when the first-time handler will show
# the trace prompt after execution completes (avoids confusing mid-flow messages)
listener = TraceCollectionListener._instance # type: ignore[misc]
if listener and listener.first_time_handler.is_first_time:
return
if not is_tracing_enabled_in_context():
if has_user_declined_tracing():
message = """Info: Tracing is disabled.

View File

@@ -182,7 +182,7 @@ class ConsoleProvider:
console.print(message, style="yellow")
console.print()
response = input(">>> \n").strip()
response = input(">>> ").strip()
else:
response = input(f"{message} ").strip()

View File

@@ -63,6 +63,32 @@ class PendingFeedbackContext:
llm: dict[str, Any] | str | None = None
requested_at: datetime = field(default_factory=datetime.now)
@staticmethod
def _make_json_safe(value: Any) -> Any:
"""Convert a value to a JSON-serializable form.
Handles Pydantic models, dataclasses, and arbitrary objects by
progressively falling back to string representation.
"""
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, (list, tuple)):
return [PendingFeedbackContext._make_json_safe(v) for v in value]
if isinstance(value, dict):
return {
k: PendingFeedbackContext._make_json_safe(v) for k, v in value.items()
}
from pydantic import BaseModel
if isinstance(value, BaseModel):
return value.model_dump(mode="json")
import dataclasses
if dataclasses.is_dataclass(value) and not isinstance(value, type):
return PendingFeedbackContext._make_json_safe(dataclasses.asdict(value))
return str(value)
def to_dict(self) -> dict[str, Any]:
"""Serialize context to a dictionary for persistence.
@@ -73,11 +99,11 @@ class PendingFeedbackContext:
"flow_id": self.flow_id,
"flow_class": self.flow_class,
"method_name": self.method_name,
"method_output": self.method_output,
"method_output": self._make_json_safe(self.method_output),
"message": self.message,
"emit": self.emit,
"default_outcome": self.default_outcome,
"metadata": self.metadata,
"metadata": self._make_json_safe(self.metadata),
"llm": self.llm,
"requested_at": self.requested_at.isoformat(),
}

View File

@@ -1223,9 +1223,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Mark that we're resuming execution
instance._is_execution_resuming = True
# Mark the method as completed (it ran before pausing)
instance._completed_methods.add(FlowMethodName(pending_context.method_name))
return instance
@property
@@ -1380,7 +1377,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
self.human_feedback_history.append(result)
self.last_human_feedback = result
# Clear pending context after processing
self._completed_methods.add(FlowMethodName(context.method_name))
self._pending_feedback_context = None
# Clear pending feedback from persistence
@@ -1403,7 +1401,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
self._is_execution_resuming = False
final_result: Any = result
if emit and collapsed_outcome is None:
collapsed_outcome = default_outcome or emit[0]
result.outcome = collapsed_outcome
try:
if emit and collapsed_outcome:
self._method_outputs.append(collapsed_outcome)
@@ -1421,7 +1422,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
self._pending_feedback_context = e.context
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
@@ -1455,6 +1457,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
return e
raise
final_result = self._method_outputs[-1] if self._method_outputs else result
# Emit flow finished
crewai_event_bus.emit(
self,
@@ -2314,7 +2318,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
if isinstance(e, HumanFeedbackPending):
e.context.method_name = method_name
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
@@ -3133,10 +3136,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
if outcome.lower() == response_clean.lower():
return outcome
# Partial match
# Partial match (longest wins, first on length ties)
response_lower = response_clean.lower()
best_outcome: str | None = None
best_len = -1
for outcome in outcomes:
if outcome.lower() in response_clean.lower():
return outcome
if outcome.lower() in response_lower and len(outcome) > best_len:
best_outcome = outcome
best_len = len(outcome)
if best_outcome is not None:
return best_outcome
# Fallback to first outcome
logger.warning(

View File

@@ -116,10 +116,11 @@ def _deserialize_llm_from_context(
return LLM(model=llm_data)
if isinstance(llm_data, dict):
model = llm_data.pop("model", None)
data = dict(llm_data)
model = data.pop("model", None)
if not model:
return None
return LLM(model=model, **llm_data)
return LLM(model=model, **data)
return None
@@ -450,12 +451,12 @@ def human_feedback(
# -- Core feedback helpers ------------------------------------
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
"""Request feedback using provider or default console."""
def _build_feedback_context(
flow_instance: Flow[Any], method_output: Any
) -> tuple[Any, Any]:
"""Build the PendingFeedbackContext and resolve the effective provider."""
from crewai.flow.async_feedback.types import PendingFeedbackContext
# Build context for provider
# Use flow_id property which handles both dict and BaseModel states
context = PendingFeedbackContext(
flow_id=flow_instance.flow_id or "unknown",
flow_class=f"{flow_instance.__class__.__module__}.{flow_instance.__class__.__name__}",
@@ -468,15 +469,53 @@ def human_feedback(
llm=llm if isinstance(llm, str) else _serialize_llm_for_context(llm),
)
# Determine effective provider:
effective_provider = provider
if effective_provider is None:
from crewai.flow.flow_config import flow_config
effective_provider = flow_config.hitl_provider
return context, effective_provider
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
"""Request feedback using provider or default console (sync)."""
context, effective_provider = _build_feedback_context(
flow_instance, method_output
)
if effective_provider is not None:
return effective_provider.request_feedback(context, flow_instance)
feedback_result = effective_provider.request_feedback(
context, flow_instance
)
if asyncio.iscoroutine(feedback_result):
raise TypeError(
f"Provider {type(effective_provider).__name__}.request_feedback() "
"returned a coroutine in a sync flow method. Use an async flow "
"method or a synchronous provider."
)
return str(feedback_result)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
metadata=metadata,
emit=emit,
)
async def _request_feedback_async(
flow_instance: Flow[Any], method_output: Any
) -> str:
"""Request feedback, awaiting the provider if it returns a coroutine."""
context, effective_provider = _build_feedback_context(
flow_instance, method_output
)
if effective_provider is not None:
feedback_result = effective_provider.request_feedback(
context, flow_instance
)
if asyncio.iscoroutine(feedback_result):
return str(await feedback_result)
return str(feedback_result)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
@@ -524,10 +563,11 @@ def human_feedback(
flow_instance.human_feedback_history.append(result)
flow_instance.last_human_feedback = result
# Return based on mode
if emit:
# Return outcome for routing
return collapsed_outcome # type: ignore[return-value]
if collapsed_outcome is None:
collapsed_outcome = default_outcome or emit[0]
result.outcome = collapsed_outcome
return collapsed_outcome
return result
if asyncio.iscoroutinefunction(func):
@@ -540,7 +580,7 @@ def human_feedback(
if learn and getattr(self, "memory", None) is not None:
method_output = _pre_review_with_lessons(self, method_output)
raw_feedback = _request_feedback(self, method_output)
raw_feedback = await _request_feedback_async(self, method_output)
result = _process_feedback(self, method_output, raw_feedback)
# Distill: extract lessons from output + feedback, store in memory

View File

@@ -80,28 +80,28 @@ class MemoryScope(BaseModel):
)
def remember_many(
self,
contents: list[str],
scope: str | None = "/",
categories: list[str] | None = None,
metadata: dict[str, Any] | None = None,
importance: float | None = None,
source: str | None = None,
private: bool = False,
agent_role: str | None = None,
self,
contents: list[str],
scope: str | None = "/",
categories: list[str] | None = None,
metadata: dict[str, Any] | None = None,
importance: float | None = None,
source: str | None = None,
private: bool = False,
agent_role: str | None = None,
) -> list[MemoryRecord]:
"""Remember multiple items; scope is relative to this scope's root."""
path = self._scope_path(scope)
return self._memory.remember_many(
contents,
scope=path,
categories=categories,
metadata=metadata,
importance=importance,
source=source,
private=private,
agent_role=agent_role,
)
"""Remember multiple items; scope is relative to this scope's root."""
path = self._scope_path(scope)
return self._memory.remember_many(
contents,
scope=path,
categories=categories,
metadata=metadata,
importance=importance,
source=source,
private=private,
agent_role=agent_role,
)
def recall(
self,

View File

@@ -0,0 +1,872 @@
"""Qdrant Edge storage backend for the unified memory system.
Uses a write-local/sync-central pattern for safe multi-process access.
Each worker process writes to its own local shard (keyed by PID). Reads
fan out to both local and central shards, merging results. On close,
local records are flushed to the shared central shard.
"""
from __future__ import annotations
import asyncio
import atexit
from datetime import datetime, timezone
import logging
import os
from pathlib import Path
import shutil
from typing import Any, Final
import uuid
from qdrant_edge import (
CountRequest,
Distance,
EdgeConfig,
EdgeShard,
EdgeVectorParams,
FacetRequest,
FieldCondition,
Filter,
MatchValue,
PayloadSchemaType,
Point,
Query,
QueryRequest,
ScrollRequest,
UpdateOperation,
)
from crewai.memory.types import MemoryRecord, ScopeInfo
_logger = logging.getLogger(__name__)
VECTOR_NAME: Final[str] = "memory"
DEFAULT_VECTOR_DIM: Final[int] = 1536
_SCROLL_BATCH: Final[int] = 256
def _uuid_to_point_id(uuid_str: str) -> int:
"""Convert a UUID string to a stable Qdrant point ID.
Falls back to hashing for non-UUID strings.
"""
try:
return uuid.UUID(uuid_str).int % (2**63 - 1)
except ValueError:
return int.from_bytes(uuid_str.encode()[:8].ljust(8, b"\x00"), "big") % (
2**63 - 1
)
def _build_scope_ancestors(scope: str) -> list[str]:
"""Build the list of all ancestor scopes for prefix filtering.
For scope ``/crew/sales/agent``, returns
``["/", "/crew", "/crew/sales", "/crew/sales/agent"]``.
"""
parts = scope.strip("/").split("/")
ancestors: list[str] = ["/"]
current = ""
for part in parts:
if part:
current = f"{current}/{part}"
ancestors.append(current)
return ancestors
class QdrantEdgeStorage:
"""Qdrant Edge storage backend with write-local/sync-central pattern.
Each worker process gets its own local shard for writes.
Reads merge results from both local and central shards. On close,
local records are flushed to the shared central shard.
"""
def __init__(
self,
path: str | Path | None = None,
vector_dim: int | None = None,
) -> None:
"""Initialize Qdrant Edge storage.
Args:
path: Base directory for shard storage. Defaults to
``$CREWAI_STORAGE_DIR/memory/qdrant-edge`` or the
platform data directory.
vector_dim: Embedding vector dimensionality. Auto-detected
from the first saved embedding when ``None``.
"""
if path is None:
storage_dir = os.environ.get("CREWAI_STORAGE_DIR")
if storage_dir:
path = Path(storage_dir) / "memory" / "qdrant-edge"
else:
from crewai.utilities.paths import db_storage_path
path = Path(db_storage_path()) / "memory" / "qdrant-edge"
self._base_path = Path(path)
self._central_path = self._base_path / "central"
self._local_path = self._base_path / f"worker-{os.getpid()}"
self._vector_dim = vector_dim or 0
self._config: EdgeConfig | None = None
self._local_has_data = self._local_path.exists()
self._closed = False
self._indexes_created = False
if self._vector_dim > 0:
self._config = self._build_config(self._vector_dim)
if self._config is None and self._central_path.exists():
try:
shard = EdgeShard.load(str(self._central_path))
if shard.count(CountRequest()) > 0:
pts, _ = shard.scroll(
ScrollRequest(limit=1, with_payload=False, with_vector=True)
)
if pts and pts[0].vector:
vec = pts[0].vector
if isinstance(vec, dict) and VECTOR_NAME in vec:
vec_data = vec[VECTOR_NAME]
dim = len(vec_data) if isinstance(vec_data, list) else 0
if dim > 0:
self._vector_dim = dim
self._config = self._build_config(dim)
shard.close()
except Exception:
_logger.debug("Failed to detect dim from central shard", exc_info=True)
self._cleanup_orphaned_shards()
atexit.register(self.close)
@staticmethod
def _build_config(dim: int) -> EdgeConfig:
"""Build an EdgeConfig for the given vector dimensionality."""
return EdgeConfig(
vectors={VECTOR_NAME: EdgeVectorParams(size=dim, distance=Distance.Cosine)},
)
def _open_shard(self, path: Path) -> EdgeShard:
"""Open an existing shard or create a new one at *path*."""
path.mkdir(parents=True, exist_ok=True)
try:
return EdgeShard.load(str(path))
except Exception:
if self._config is None:
raise
return EdgeShard.create(str(path), self._config)
def _ensure_indexes(self, shard: EdgeShard) -> None:
"""Create payload indexes for efficient filtering."""
if self._indexes_created:
return
try:
shard.update(
UpdateOperation.create_field_index(
"scope_ancestors", PayloadSchemaType.Keyword
)
)
shard.update(
UpdateOperation.create_field_index(
"categories", PayloadSchemaType.Keyword
)
)
shard.update(
UpdateOperation.create_field_index(
"record_id", PayloadSchemaType.Keyword
)
)
self._indexes_created = True
except Exception:
_logger.debug("Index creation failed (may already exist)", exc_info=True)
def _record_to_point(self, record: MemoryRecord) -> Point:
"""Convert a MemoryRecord to a Qdrant Point."""
return Point(
id=_uuid_to_point_id(record.id),
vector={
VECTOR_NAME: record.embedding
if record.embedding
else [0.0] * self._vector_dim,
},
payload={
"record_id": record.id,
"content": record.content,
"scope": record.scope,
"scope_ancestors": _build_scope_ancestors(record.scope),
"categories": record.categories,
"metadata": record.metadata,
"importance": record.importance,
"created_at": record.created_at.isoformat(),
"last_accessed": record.last_accessed.isoformat(),
"source": record.source or "",
"private": record.private,
},
)
@staticmethod
def _payload_to_record(
payload: dict[str, Any],
vector: dict[str, list[float]] | None = None,
) -> MemoryRecord:
"""Reconstruct a MemoryRecord from a Qdrant payload."""
def _parse_dt(val: Any) -> datetime:
if val is None:
return datetime.now(timezone.utc).replace(tzinfo=None)
if isinstance(val, datetime):
return val
return datetime.fromisoformat(str(val).replace("Z", "+00:00"))
return MemoryRecord(
id=str(payload["record_id"]),
content=str(payload["content"]),
scope=str(payload["scope"]),
categories=payload.get("categories", []),
metadata=payload.get("metadata", {}),
importance=float(payload.get("importance", 0.5)),
created_at=_parse_dt(payload.get("created_at")),
last_accessed=_parse_dt(payload.get("last_accessed")),
embedding=vector.get(VECTOR_NAME) if vector else None,
source=payload.get("source") or None,
private=bool(payload.get("private", False)),
)
@staticmethod
def _build_scope_filter(scope_prefix: str | None) -> Filter | None:
"""Build a Qdrant Filter for scope prefix matching."""
if scope_prefix is None or not scope_prefix.strip("/"):
return None
prefix = scope_prefix.rstrip("/")
if not prefix.startswith("/"):
prefix = "/" + prefix
return Filter(
must=[FieldCondition(key="scope_ancestors", match=MatchValue(value=prefix))]
)
@staticmethod
def _scroll_all(
shard: EdgeShard,
filt: Filter | None = None,
with_vector: bool = False,
) -> list[Any]:
"""Scroll all points matching a filter from a shard."""
all_points: list[Any] = []
offset = None
while True:
batch, next_offset = shard.scroll(
ScrollRequest(
limit=_SCROLL_BATCH,
offset=offset,
with_payload=True,
with_vector=with_vector,
filter=filt,
)
)
all_points.extend(batch)
if next_offset is None or not batch:
break
offset = next_offset
return all_points
def save(self, records: list[MemoryRecord]) -> None:
"""Save records to the worker-local shard."""
if not records:
return
if self._vector_dim == 0:
for r in records:
if r.embedding and len(r.embedding) > 0:
self._vector_dim = len(r.embedding)
break
if self._config is None and self._vector_dim > 0:
self._config = self._build_config(self._vector_dim)
if self._config is None:
self._config = self._build_config(DEFAULT_VECTOR_DIM)
self._vector_dim = DEFAULT_VECTOR_DIM
points = [self._record_to_point(r) for r in records]
local = self._open_shard(self._local_path)
try:
self._ensure_indexes(local)
local.update(UpdateOperation.upsert_points(points))
local.flush()
self._local_has_data = True
finally:
local.close()
def search(
self,
query_embedding: list[float],
scope_prefix: str | None = None,
categories: list[str] | None = None,
metadata_filter: dict[str, Any] | None = None,
limit: int = 10,
min_score: float = 0.0,
) -> list[tuple[MemoryRecord, float]]:
"""Search both central and local shards, merge results."""
filt = self._build_scope_filter(scope_prefix)
fetch_limit = limit * 3 if (categories or metadata_filter) else limit
all_scored: list[tuple[dict[str, Any], float, bool]] = []
for shard_path in (self._central_path, self._local_path):
if not shard_path.exists():
continue
is_local = shard_path == self._local_path
try:
shard = EdgeShard.load(str(shard_path))
results = shard.query(
QueryRequest(
query=Query.Nearest(list(query_embedding), using=VECTOR_NAME),
filter=filt,
limit=fetch_limit,
with_payload=True,
with_vector=False,
)
)
all_scored.extend(
(sp.payload or {}, float(sp.score), is_local) for sp in results
)
shard.close()
except Exception:
_logger.debug("Search failed on %s", shard_path, exc_info=True)
seen: dict[str, tuple[dict[str, Any], float]] = {}
local_ids: set[str] = set()
for payload, score, is_local in all_scored:
rid = payload["record_id"]
if is_local:
local_ids.add(rid)
seen[rid] = (payload, score)
elif rid not in local_ids:
if rid not in seen or score > seen[rid][1]:
seen[rid] = (payload, score)
ranked = sorted(seen.values(), key=lambda x: x[1], reverse=True)
out: list[tuple[MemoryRecord, float]] = []
for payload, score in ranked:
record = self._payload_to_record(payload)
if categories and not any(c in record.categories for c in categories):
continue
if metadata_filter and not all(
record.metadata.get(k) == v for k, v in metadata_filter.items()
):
continue
if score < min_score:
continue
out.append((record, score))
if len(out) >= limit:
break
return out[:limit]
def delete(
self,
scope_prefix: str | None = None,
categories: list[str] | None = None,
record_ids: list[str] | None = None,
older_than: datetime | None = None,
metadata_filter: dict[str, Any] | None = None,
) -> int:
"""Delete matching records from central shard."""
total_deleted = 0
for shard_path in (self._central_path, self._local_path):
if not shard_path.exists():
continue
try:
total_deleted += self._delete_from_shard_path(
shard_path,
scope_prefix,
categories,
record_ids,
older_than,
metadata_filter,
)
except Exception:
_logger.debug("Delete failed on %s", shard_path, exc_info=True)
return total_deleted
def _delete_from_shard_path(
self,
shard_path: Path,
scope_prefix: str | None,
categories: list[str] | None,
record_ids: list[str] | None,
older_than: datetime | None,
metadata_filter: dict[str, Any] | None,
) -> int:
"""Delete matching records from a shard at the given path."""
shard = EdgeShard.load(str(shard_path))
try:
deleted = self._delete_from_shard(
shard,
scope_prefix,
categories,
record_ids,
older_than,
metadata_filter,
)
shard.flush()
finally:
shard.close()
return deleted
def _delete_from_shard(
self,
shard: EdgeShard,
scope_prefix: str | None,
categories: list[str] | None,
record_ids: list[str] | None,
older_than: datetime | None,
metadata_filter: dict[str, Any] | None,
) -> int:
"""Delete matching records from a single shard, returning count deleted."""
before = shard.count(CountRequest())
if record_ids and not (categories or metadata_filter or older_than):
point_ids: list[int | uuid.UUID | str] = [
_uuid_to_point_id(rid) for rid in record_ids
]
shard.update(UpdateOperation.delete_points(point_ids))
return before - shard.count(CountRequest())
if categories or metadata_filter or older_than:
scope_filter = self._build_scope_filter(scope_prefix)
points = self._scroll_all(shard, filt=scope_filter)
allowed_ids: set[str] | None = set(record_ids) if record_ids else None
to_delete: list[int | uuid.UUID | str] = []
for pt in points:
record = self._payload_to_record(pt.payload or {})
if allowed_ids and record.id not in allowed_ids:
continue
if categories and not any(c in record.categories for c in categories):
continue
if metadata_filter and not all(
record.metadata.get(k) == v for k, v in metadata_filter.items()
):
continue
if older_than and record.created_at >= older_than:
continue
to_delete.append(pt.id)
if to_delete:
shard.update(UpdateOperation.delete_points(to_delete))
return before - shard.count(CountRequest())
scope_filter = self._build_scope_filter(scope_prefix)
if scope_filter:
shard.update(UpdateOperation.delete_points_by_filter(filter=scope_filter))
else:
points = self._scroll_all(shard)
if points:
all_ids: list[int | uuid.UUID | str] = [p.id for p in points]
shard.update(UpdateOperation.delete_points(all_ids))
return before - shard.count(CountRequest())
def update(self, record: MemoryRecord) -> None:
"""Update a record by upserting with the same point ID."""
if self._config is None:
if record.embedding and len(record.embedding) > 0:
self._vector_dim = len(record.embedding)
self._config = self._build_config(self._vector_dim)
else:
self._config = self._build_config(DEFAULT_VECTOR_DIM)
self._vector_dim = DEFAULT_VECTOR_DIM
point = self._record_to_point(record)
local = self._open_shard(self._local_path)
try:
self._ensure_indexes(local)
local.update(UpdateOperation.upsert_points([point]))
local.flush()
self._local_has_data = True
finally:
local.close()
def get_record(self, record_id: str) -> MemoryRecord | None:
"""Return a single record by ID, or None if not found."""
point_id = _uuid_to_point_id(record_id)
for shard_path in (self._local_path, self._central_path):
if not shard_path.exists():
continue
try:
shard = EdgeShard.load(str(shard_path))
records = shard.retrieve([point_id], True, True)
shard.close()
if records:
payload = records[0].payload or {}
vec = records[0].vector
vec_dict = vec if isinstance(vec, dict) else None
return self._payload_to_record(payload, vec_dict) # type: ignore[arg-type]
except Exception:
_logger.debug("get_record failed on %s", shard_path, exc_info=True)
return None
def list_records(
self,
scope_prefix: str | None = None,
limit: int = 200,
offset: int = 0,
) -> list[MemoryRecord]:
"""List records in a scope, newest first."""
filt = self._build_scope_filter(scope_prefix)
all_records: list[MemoryRecord] = []
seen_ids: set[str] = set()
for shard_path in (self._local_path, self._central_path):
if not shard_path.exists():
continue
try:
shard = EdgeShard.load(str(shard_path))
points = self._scroll_all(shard, filt=filt)
shard.close()
for pt in points:
rid = pt.payload["record_id"]
if rid not in seen_ids:
seen_ids.add(rid)
all_records.append(self._payload_to_record(pt.payload))
except Exception:
_logger.debug("list_records failed on %s", shard_path, exc_info=True)
all_records.sort(key=lambda r: r.created_at, reverse=True)
return all_records[offset : offset + limit]
def get_scope_info(self, scope: str) -> ScopeInfo:
"""Get information about a scope."""
scope = scope.rstrip("/") or "/"
prefix = scope if scope != "/" else None
filt = self._build_scope_filter(prefix)
all_points: list[Any] = []
for shard_path in (self._central_path, self._local_path):
if not shard_path.exists():
continue
try:
shard = EdgeShard.load(str(shard_path))
all_points.extend(self._scroll_all(shard, filt=filt))
shard.close()
except Exception:
_logger.debug("get_scope_info failed on %s", shard_path, exc_info=True)
if not all_points:
return ScopeInfo(
path=scope,
record_count=0,
categories=[],
oldest_record=None,
newest_record=None,
child_scopes=[],
)
seen: dict[str, Any] = {}
for pt in all_points:
rid = pt.payload["record_id"]
if rid not in seen:
seen[rid] = pt
categories_set: set[str] = set()
oldest: datetime | None = None
newest: datetime | None = None
child_prefix = (scope + "/") if scope != "/" else "/"
children: set[str] = set()
for pt in seen.values():
payload = pt.payload
sc = str(payload.get("scope", ""))
if child_prefix and sc.startswith(child_prefix):
rest = sc[len(child_prefix) :]
first_component = rest.split("/", 1)[0]
if first_component:
children.add(child_prefix + first_component)
for c in payload.get("categories", []):
categories_set.add(c)
created = payload.get("created_at")
if created:
dt = datetime.fromisoformat(str(created).replace("Z", "+00:00"))
if oldest is None or dt < oldest:
oldest = dt
if newest is None or dt > newest:
newest = dt
return ScopeInfo(
path=scope,
record_count=len(seen),
categories=sorted(categories_set),
oldest_record=oldest,
newest_record=newest,
child_scopes=sorted(children),
)
def list_scopes(self, parent: str = "/") -> list[str]:
"""List immediate child scopes under a parent path."""
parent = parent.rstrip("/") or ""
prefix = (parent + "/") if parent else "/"
all_scopes: set[str] = set()
filt = self._build_scope_filter(prefix if prefix != "/" else None)
for shard_path in (self._central_path, self._local_path):
if not shard_path.exists():
continue
try:
shard = EdgeShard.load(str(shard_path))
points = self._scroll_all(shard, filt=filt)
shard.close()
for pt in points:
sc = str(pt.payload.get("scope", ""))
if sc.startswith(prefix) and sc != (prefix.rstrip("/") or "/"):
rest = sc[len(prefix) :]
first_component = rest.split("/", 1)[0]
if first_component:
all_scopes.add(prefix + first_component)
except Exception:
_logger.debug("list_scopes failed on %s", shard_path, exc_info=True)
return sorted(all_scopes)
def list_categories(self, scope_prefix: str | None = None) -> dict[str, int]:
"""List categories and their counts within a scope."""
if not self._local_has_data and self._central_path.exists():
try:
shard = EdgeShard.load(str(self._central_path))
try:
shard.update(
UpdateOperation.create_field_index(
"categories", PayloadSchemaType.Keyword
)
)
except Exception: # noqa: S110
pass
filt = self._build_scope_filter(scope_prefix)
facet_result = shard.facet(
FacetRequest(key="categories", limit=1000, filter=filt)
)
shard.close()
return {str(hit.value): hit.count for hit in facet_result.hits}
except Exception:
_logger.debug("list_categories failed on central", exc_info=True)
counts: dict[str, int] = {}
for record in self.list_records(scope_prefix=scope_prefix, limit=50_000):
for c in record.categories:
counts[c] = counts.get(c, 0) + 1
return counts
def count(self, scope_prefix: str | None = None) -> int:
"""Count records in scope (and subscopes)."""
filt = self._build_scope_filter(scope_prefix)
if not self._local_has_data:
if self._central_path.exists():
try:
shard = EdgeShard.load(str(self._central_path))
result = shard.count(CountRequest(filter=filt))
shard.close()
return result
except Exception:
_logger.debug("count failed on central", exc_info=True)
return 0
seen_ids: set[str] = set()
for shard_path in (self._local_path, self._central_path):
if not shard_path.exists():
continue
try:
shard = EdgeShard.load(str(shard_path))
for pt in self._scroll_all(shard, filt=filt):
seen_ids.add(pt.payload["record_id"])
shard.close()
except Exception:
_logger.debug("count failed on %s", shard_path, exc_info=True)
return len(seen_ids)
def reset(self, scope_prefix: str | None = None) -> None:
"""Reset (delete all) memories in scope."""
if scope_prefix is None or not scope_prefix.strip("/"):
for shard_path in (self._central_path, self._local_path):
if shard_path.exists():
shutil.rmtree(shard_path, ignore_errors=True)
self._local_has_data = False
self._indexes_created = False
return
self.delete(scope_prefix=scope_prefix)
def touch_records(self, record_ids: list[str]) -> None:
"""Update last_accessed to now for the given record IDs."""
if not record_ids:
return
now = datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
point_ids: list[int | uuid.UUID | str] = [
_uuid_to_point_id(rid) for rid in record_ids
]
for shard_path in (self._central_path, self._local_path):
if not shard_path.exists():
continue
try:
shard = EdgeShard.load(str(shard_path))
shard.update(
UpdateOperation.set_payload(point_ids, {"last_accessed": now})
)
shard.flush()
shard.close()
except Exception:
_logger.debug("touch_records failed on %s", shard_path, exc_info=True)
def optimize(self) -> None:
"""Compact the central shard synchronously."""
if not self._central_path.exists():
return
try:
shard = EdgeShard.load(str(self._central_path))
shard.optimize()
shard.close()
except Exception:
_logger.debug("optimize failed", exc_info=True)
def _upsert_to_central(self, points: list[Any]) -> None:
"""Convert scrolled points to Qdrant Points and upsert to central shard."""
qdrant_points = [
Point(
id=pt.id,
vector=pt.vector if pt.vector else {},
payload=pt.payload if pt.payload else {},
)
for pt in points
]
central = self._open_shard(self._central_path)
try:
self._ensure_indexes(central)
central.update(UpdateOperation.upsert_points(qdrant_points))
central.flush()
finally:
central.close()
def flush_to_central(self) -> None:
"""Sync local shard records to the central shard."""
if not self._local_has_data or not self._local_path.exists():
return
try:
local = EdgeShard.load(str(self._local_path))
except Exception:
_logger.debug("flush_to_central: failed to open local shard", exc_info=True)
return
points = self._scroll_all(local, with_vector=True)
local.close()
if not points:
shutil.rmtree(self._local_path, ignore_errors=True)
self._local_has_data = False
return
self._upsert_to_central(points)
shutil.rmtree(self._local_path, ignore_errors=True)
self._local_has_data = False
def close(self) -> None:
"""Flush local shard to central and clean up."""
if self._closed:
return
self._closed = True
atexit.unregister(self.close)
try:
self.flush_to_central()
except Exception:
_logger.debug("close: flush_to_central failed", exc_info=True)
def _cleanup_orphaned_shards(self) -> None:
"""Sync and remove local shards from dead worker processes."""
if not self._base_path.exists():
return
for entry in self._base_path.iterdir():
if not entry.is_dir() or not entry.name.startswith("worker-"):
continue
pid_str = entry.name.removeprefix("worker-")
try:
pid = int(pid_str)
except ValueError:
continue
if pid == os.getpid():
continue
try:
os.kill(pid, 0)
continue
except ProcessLookupError:
_logger.debug("Worker %d is dead, shard is orphaned", pid)
except PermissionError:
continue
_logger.info("Cleaning up orphaned shard for dead worker %d", pid)
try:
orphan = EdgeShard.load(str(entry))
points = self._scroll_all(orphan, with_vector=True)
orphan.close()
if not points:
shutil.rmtree(entry, ignore_errors=True)
continue
if self._config is None:
for pt in points:
vec = pt.vector
if isinstance(vec, dict) and VECTOR_NAME in vec:
vec_data = vec[VECTOR_NAME]
if isinstance(vec_data, list) and len(vec_data) > 0:
self._vector_dim = len(vec_data)
self._config = self._build_config(self._vector_dim)
break
if self._config is None:
_logger.warning(
"Cannot recover orphaned shard %s: vector dimension unknown",
entry,
)
continue
self._upsert_to_central(points)
shutil.rmtree(entry, ignore_errors=True)
except Exception:
_logger.warning(
"Failed to recover orphaned shard %s", entry, exc_info=True
)
async def asave(self, records: list[MemoryRecord]) -> None:
"""Save memory records asynchronously."""
await asyncio.to_thread(self.save, records)
async def asearch(
self,
query_embedding: list[float],
scope_prefix: str | None = None,
categories: list[str] | None = None,
metadata_filter: dict[str, Any] | None = None,
limit: int = 10,
min_score: float = 0.0,
) -> list[tuple[MemoryRecord, float]]:
"""Search for memories asynchronously."""
return await asyncio.to_thread(
self.search,
query_embedding,
scope_prefix=scope_prefix,
categories=categories,
metadata_filter=metadata_filter,
limit=limit,
min_score=min_score,
)
async def adelete(
self,
scope_prefix: str | None = None,
categories: list[str] | None = None,
record_ids: list[str] | None = None,
older_than: datetime | None = None,
metadata_filter: dict[str, Any] | None = None,
) -> int:
"""Delete memories asynchronously."""
return await asyncio.to_thread(
self.delete,
scope_prefix=scope_prefix,
categories=categories,
record_ids=record_ids,
older_than=older_than,
metadata_filter=metadata_filter,
)

View File

@@ -173,13 +173,18 @@ class Memory(BaseModel):
)
if isinstance(self.storage, str):
from crewai.memory.storage.lancedb_storage import LanceDBStorage
if self.storage == "qdrant-edge":
from crewai.memory.storage.qdrant_edge_storage import QdrantEdgeStorage
self._storage = (
LanceDBStorage()
if self.storage == "lancedb"
else LanceDBStorage(path=self.storage)
)
self._storage = QdrantEdgeStorage()
elif self.storage == "lancedb":
from crewai.memory.storage.lancedb_storage import LanceDBStorage
self._storage = LanceDBStorage()
else:
from crewai.memory.storage.lancedb_storage import LanceDBStorage
self._storage = LanceDBStorage(path=self.storage)
else:
self._storage = self.storage
@@ -293,8 +298,10 @@ class Memory(BaseModel):
future.result() # blocks until done; re-raises exceptions
def close(self) -> None:
"""Drain pending saves and shut down the background thread pool."""
"""Drain pending saves, flush storage, and shut down the background thread pool."""
self.drain_writes()
if hasattr(self._storage, "close"):
self._storage.close()
self._save_pool.shutdown(wait=True)
def _encode_batch(

View File

@@ -136,6 +136,7 @@ class TestPlusAPI(unittest.TestCase):
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params
@@ -173,6 +174,7 @@ class TestPlusAPI(unittest.TestCase):
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
self.assert_request_with_org_id(
@@ -201,6 +203,48 @@ class TestPlusAPI(unittest.TestCase):
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params
)
self.assertEqual(response, mock_response)
@patch("crewai.cli.plus_api.PlusAPI._make_request")
def test_publish_tool_with_tools_metadata(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
handle = "test_tool_handle"
public = True
version = "1.0.0"
description = "Test tool description"
encoded_file = "encoded_test_file"
available_exports = [{"name": "MyTool"}]
tools_metadata = [
{
"name": "MyTool",
"humanized_name": "my_tool",
"description": "A test tool",
"run_params_schema": {"type": "object", "properties": {}},
"init_params_schema": {"type": "object", "properties": {}},
"env_vars": [{"name": "API_KEY", "description": "API key", "required": True, "default": None}],
}
]
response = self.api.publish_tool(
handle, public, version, description, encoded_file,
available_exports=available_exports,
tools_metadata=tools_metadata,
)
params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": available_exports,
"tools_metadata": {"package": handle, "tools": tools_metadata},
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params

View File

@@ -363,3 +363,261 @@ def test_get_crews_ignores_template_directories(
utils.get_crews()
assert not template_crew_detected
# Tests for extract_tools_metadata
def test_extract_tools_metadata_empty_project(temp_project_dir):
"""Test that extract_tools_metadata returns empty list for empty project."""
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []
def test_extract_tools_metadata_no_init_file(temp_project_dir):
"""Test that extract_tools_metadata returns empty list when no __init__.py exists."""
(temp_project_dir / "some_file.py").write_text("print('hello')")
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []
def test_extract_tools_metadata_empty_init_file(temp_project_dir):
"""Test that extract_tools_metadata returns empty list for empty __init__.py."""
create_init_file(temp_project_dir, "")
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []
def test_extract_tools_metadata_no_all_variable(temp_project_dir):
"""Test that extract_tools_metadata returns empty list when __all__ is not defined."""
create_init_file(
temp_project_dir,
"from crewai.tools import BaseTool\n\nclass MyTool(BaseTool):\n pass",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []
def test_extract_tools_metadata_valid_base_tool_class(temp_project_dir):
"""Test that extract_tools_metadata extracts metadata from a valid BaseTool class."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class MyTool(BaseTool):
name: str = "my_tool"
description: str = "A test tool"
__all__ = ['MyTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 1
assert metadata[0]["name"] == "MyTool"
assert metadata[0]["humanized_name"] == "my_tool"
assert metadata[0]["description"] == "A test tool"
def test_extract_tools_metadata_with_args_schema(temp_project_dir):
"""Test that extract_tools_metadata extracts run_params_schema from args_schema."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
from pydantic import BaseModel
class MyToolInput(BaseModel):
query: str
limit: int = 10
class MyTool(BaseTool):
name: str = "my_tool"
description: str = "A test tool"
args_schema: type[BaseModel] = MyToolInput
__all__ = ['MyTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 1
assert metadata[0]["name"] == "MyTool"
run_params = metadata[0]["run_params_schema"]
assert "properties" in run_params
assert "query" in run_params["properties"]
assert "limit" in run_params["properties"]
def test_extract_tools_metadata_with_env_vars(temp_project_dir):
"""Test that extract_tools_metadata extracts env_vars."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
from crewai.tools.base_tool import EnvVar
class MyTool(BaseTool):
name: str = "my_tool"
description: str = "A test tool"
env_vars: list[EnvVar] = [
EnvVar(name="MY_API_KEY", description="API key for service", required=True),
EnvVar(name="MY_OPTIONAL_VAR", description="Optional var", required=False, default="default_value"),
]
__all__ = ['MyTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 1
env_vars = metadata[0]["env_vars"]
assert len(env_vars) == 2
assert env_vars[0]["name"] == "MY_API_KEY"
assert env_vars[0]["description"] == "API key for service"
assert env_vars[0]["required"] is True
assert env_vars[1]["name"] == "MY_OPTIONAL_VAR"
assert env_vars[1]["required"] is False
assert env_vars[1]["default"] == "default_value"
def test_extract_tools_metadata_with_custom_init_params(temp_project_dir):
"""Test that extract_tools_metadata extracts init_params_schema with custom params."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class MyTool(BaseTool):
name: str = "my_tool"
description: str = "A test tool"
api_endpoint: str = "https://api.example.com"
timeout: int = 30
__all__ = ['MyTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 1
init_params = metadata[0]["init_params_schema"]
assert "properties" in init_params
# Custom params should be included
assert "api_endpoint" in init_params["properties"]
assert "timeout" in init_params["properties"]
# Base params should be filtered out
assert "name" not in init_params["properties"]
assert "description" not in init_params["properties"]
def test_extract_tools_metadata_multiple_tools(temp_project_dir):
"""Test that extract_tools_metadata extracts metadata from multiple tools."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class FirstTool(BaseTool):
name: str = "first_tool"
description: str = "First test tool"
class SecondTool(BaseTool):
name: str = "second_tool"
description: str = "Second test tool"
__all__ = ['FirstTool', 'SecondTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 2
names = [m["name"] for m in metadata]
assert "FirstTool" in names
assert "SecondTool" in names
def test_extract_tools_metadata_multiple_init_files(temp_project_dir):
"""Test that extract_tools_metadata extracts metadata from multiple __init__.py files."""
# Create tool in root __init__.py
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class RootTool(BaseTool):
name: str = "root_tool"
description: str = "Root tool"
__all__ = ['RootTool']
""",
)
# Create nested package with another tool
nested_dir = temp_project_dir / "nested"
nested_dir.mkdir()
create_init_file(
nested_dir,
"""from crewai.tools import BaseTool
class NestedTool(BaseTool):
name: str = "nested_tool"
description: str = "Nested tool"
__all__ = ['NestedTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 2
names = [m["name"] for m in metadata]
assert "RootTool" in names
assert "NestedTool" in names
def test_extract_tools_metadata_ignores_non_tool_exports(temp_project_dir):
"""Test that extract_tools_metadata ignores non-BaseTool exports."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class MyTool(BaseTool):
name: str = "my_tool"
description: str = "A test tool"
def not_a_tool():
pass
SOME_CONSTANT = "value"
__all__ = ['MyTool', 'not_a_tool', 'SOME_CONSTANT']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 1
assert metadata[0]["name"] == "MyTool"
def test_extract_tools_metadata_import_error_returns_empty(temp_project_dir):
"""Test that extract_tools_metadata returns empty list on import error."""
create_init_file(
temp_project_dir,
"""from nonexistent_module import something
class MyTool(BaseTool):
pass
__all__ = ['MyTool']
""",
)
# Should not raise, just return empty list
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []
def test_extract_tools_metadata_syntax_error_returns_empty(temp_project_dir):
"""Test that extract_tools_metadata returns empty list on syntax error."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class MyTool(BaseTool):
# Missing closing parenthesis
def __init__(self, name:
pass
__all__ = ['MyTool']
""",
)
# Should not raise, just return empty list
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []

View File

@@ -185,9 +185,14 @@ def test_publish_when_not_in_sync(mock_is_synced, capsys, tool_command):
"crewai.cli.tools.main.extract_available_exports",
return_value=[{"name": "SampleTool"}],
)
@patch(
"crewai.cli.tools.main.extract_tools_metadata",
return_value=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
@patch("crewai.cli.tools.main.ToolCommand._print_current_organization")
def test_publish_when_not_in_sync_and_force(
mock_print_org,
mock_tools_metadata,
mock_available_exports,
mock_is_synced,
mock_publish,
@@ -222,6 +227,7 @@ def test_publish_when_not_in_sync_and_force(
description="A sample tool",
encoded_file=unittest.mock.ANY,
available_exports=[{"name": "SampleTool"}],
tools_metadata=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
mock_print_org.assert_called_once()
@@ -242,7 +248,12 @@ def test_publish_when_not_in_sync_and_force(
"crewai.cli.tools.main.extract_available_exports",
return_value=[{"name": "SampleTool"}],
)
@patch(
"crewai.cli.tools.main.extract_tools_metadata",
return_value=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
def test_publish_success(
mock_tools_metadata,
mock_available_exports,
mock_is_synced,
mock_publish,
@@ -277,6 +288,7 @@ def test_publish_success(
description="A sample tool",
encoded_file=unittest.mock.ANY,
available_exports=[{"name": "SampleTool"}],
tools_metadata=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
@@ -295,7 +307,12 @@ def test_publish_success(
"crewai.cli.tools.main.extract_available_exports",
return_value=[{"name": "SampleTool"}],
)
@patch(
"crewai.cli.tools.main.extract_tools_metadata",
return_value=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
def test_publish_failure(
mock_tools_metadata,
mock_available_exports,
mock_publish,
mock_open,
@@ -336,7 +353,12 @@ def test_publish_failure(
"crewai.cli.tools.main.extract_available_exports",
return_value=[{"name": "SampleTool"}],
)
@patch(
"crewai.cli.tools.main.extract_tools_metadata",
return_value=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
def test_publish_api_error(
mock_tools_metadata,
mock_available_exports,
mock_publish,
mock_open,
@@ -362,6 +384,63 @@ def test_publish_api_error(
mock_publish.assert_called_once()
@patch("crewai.cli.tools.main.get_project_name", return_value="sample-tool")
@patch("crewai.cli.tools.main.get_project_version", return_value="1.0.0")
@patch("crewai.cli.tools.main.get_project_description", return_value="A sample tool")
@patch("crewai.cli.tools.main.subprocess.run")
@patch("crewai.cli.tools.main.os.listdir", return_value=["sample-tool-1.0.0.tar.gz"])
@patch(
"crewai.cli.tools.main.open",
new_callable=unittest.mock.mock_open,
read_data=b"sample tarball content",
)
@patch("crewai.cli.plus_api.PlusAPI.publish_tool")
@patch("crewai.cli.tools.main.git.Repository.is_synced", return_value=True)
@patch(
"crewai.cli.tools.main.extract_available_exports",
return_value=[{"name": "SampleTool"}],
)
@patch(
"crewai.cli.tools.main.extract_tools_metadata",
side_effect=Exception("Failed to extract metadata"),
)
def test_publish_metadata_extraction_failure_continues_with_warning(
mock_tools_metadata,
mock_available_exports,
mock_is_synced,
mock_publish,
mock_open,
mock_listdir,
mock_subprocess_run,
mock_get_project_description,
mock_get_project_version,
mock_get_project_name,
capsys,
tool_command,
):
"""Test that metadata extraction failure shows warning but continues publishing."""
mock_publish_response = MagicMock()
mock_publish_response.status_code = 200
mock_publish_response.json.return_value = {"handle": "sample-tool"}
mock_publish.return_value = mock_publish_response
tool_command.publish(is_public=True)
output = capsys.readouterr().out
assert "Warning: Could not extract tool metadata" in output
assert "Publishing will continue without detailed metadata" in output
assert "No tool metadata extracted" in output
mock_publish.assert_called_once_with(
handle="sample-tool",
is_public=True,
version="1.0.0",
description="A sample tool",
encoded_file=unittest.mock.ANY,
available_exports=[{"name": "SampleTool"}],
tools_metadata=[],
)
@patch("crewai.cli.tools.main.Settings")
def test_print_current_organization_with_org(mock_settings, capsys, tool_command):
mock_settings_instance = MagicMock()

View File

@@ -0,0 +1,353 @@
"""Tests for Qdrant Edge storage backend."""
from __future__ import annotations
import importlib
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any
from unittest.mock import MagicMock
import pytest
pytestmark = pytest.mark.skipif(
importlib.util.find_spec("qdrant_edge") is None,
reason="qdrant-edge-py not installed",
)
if TYPE_CHECKING:
from crewai.memory.storage.qdrant_edge_storage import QdrantEdgeStorage
from crewai.memory.types import MemoryRecord
def _make_storage(path: str, vector_dim: int = 4) -> QdrantEdgeStorage:
from crewai.memory.storage.qdrant_edge_storage import QdrantEdgeStorage
return QdrantEdgeStorage(path=path, vector_dim=vector_dim)
@pytest.fixture
def storage(tmp_path: Path) -> QdrantEdgeStorage:
return _make_storage(str(tmp_path / "edge"))
def _rec(
content: str = "test",
scope: str = "/",
categories: list[str] | None = None,
importance: float = 0.5,
embedding: list[float] | None = None,
metadata: dict | None = None,
created_at: datetime | None = None,
) -> MemoryRecord:
return MemoryRecord(
content=content,
scope=scope,
categories=categories or [],
importance=importance,
embedding=embedding or [0.1, 0.2, 0.3, 0.4],
metadata=metadata or {},
**({"created_at": created_at} if created_at else {}),
)
# --- Basic CRUD ---
def test_save_search(storage: QdrantEdgeStorage) -> None:
r = _rec(content="test content", scope="/foo", categories=["cat1"], importance=0.8)
storage.save([r])
results = storage.search([0.1, 0.2, 0.3, 0.4], scope_prefix="/foo", limit=5)
assert len(results) == 1
rec, score = results[0]
assert rec.content == "test content"
assert rec.scope == "/foo"
assert score >= 0.0
def test_delete_count(storage: QdrantEdgeStorage) -> None:
r = _rec(scope="/")
storage.save([r])
assert storage.count() == 1
n = storage.delete(scope_prefix="/")
assert n >= 1
assert storage.count() == 0
def test_update_get_record(storage: QdrantEdgeStorage) -> None:
r = _rec(content="original", scope="/a")
storage.save([r])
r.content = "updated"
storage.update(r)
found = storage.get_record(r.id)
assert found is not None
assert found.content == "updated"
def test_get_record_not_found(storage: QdrantEdgeStorage) -> None:
assert storage.get_record("nonexistent-id") is None
# --- Scope operations ---
def test_list_scopes_get_scope_info(storage: QdrantEdgeStorage) -> None:
storage.save([
_rec(content="a", scope="/"),
_rec(content="b", scope="/team"),
])
scopes = storage.list_scopes("/")
assert "/team" in scopes
info = storage.get_scope_info("/")
assert info.record_count >= 1
assert info.path == "/"
def test_scope_prefix_filter(storage: QdrantEdgeStorage) -> None:
storage.save([
_rec(content="sales note", scope="/crew/sales"),
_rec(content="eng note", scope="/crew/eng"),
_rec(content="other note", scope="/other"),
])
results = storage.search([0.1, 0.2, 0.3, 0.4], scope_prefix="/crew", limit=10)
assert len(results) == 2
scopes = {r.scope for r, _ in results}
assert "/crew/sales" in scopes
assert "/crew/eng" in scopes
# --- Filtering ---
def test_category_filter(storage: QdrantEdgeStorage) -> None:
storage.save([
_rec(content="cat1 item", categories=["cat1"]),
_rec(content="cat2 item", categories=["cat2"]),
])
results = storage.search(
[0.1, 0.2, 0.3, 0.4], categories=["cat1"], limit=10
)
assert len(results) == 1
assert results[0][0].categories == ["cat1"]
def test_metadata_filter(storage: QdrantEdgeStorage) -> None:
storage.save([
_rec(content="with key", metadata={"env": "prod"}),
_rec(content="without key", metadata={"env": "dev"}),
])
results = storage.search(
[0.1, 0.2, 0.3, 0.4], metadata_filter={"env": "prod"}, limit=10
)
assert len(results) == 1
assert results[0][0].metadata["env"] == "prod"
# --- List & pagination ---
def test_list_records_pagination(storage: QdrantEdgeStorage) -> None:
records = [
_rec(
content=f"item {i}",
created_at=datetime(2025, 1, 1) + timedelta(days=i),
)
for i in range(5)
]
storage.save(records)
page1 = storage.list_records(limit=2, offset=0)
page2 = storage.list_records(limit=2, offset=2)
assert len(page1) == 2
assert len(page2) == 2
# Newest first.
assert page1[0].created_at >= page1[1].created_at
def test_list_categories(storage: QdrantEdgeStorage) -> None:
storage.save([
_rec(categories=["a", "b"]),
_rec(categories=["b", "c"]),
])
cats = storage.list_categories()
assert cats.get("b", 0) == 2
assert cats.get("a", 0) >= 1
assert cats.get("c", 0) >= 1
# --- Touch & reset ---
def test_touch_records(storage: QdrantEdgeStorage) -> None:
r = _rec()
storage.save([r])
before = storage.get_record(r.id)
assert before is not None
old_accessed = before.last_accessed
storage.touch_records([r.id])
after = storage.get_record(r.id)
assert after is not None
assert after.last_accessed >= old_accessed
def test_reset_full(storage: QdrantEdgeStorage) -> None:
storage.save([_rec(scope="/a"), _rec(scope="/b")])
assert storage.count() == 2
storage.reset()
assert storage.count() == 0
def test_reset_scoped(storage: QdrantEdgeStorage) -> None:
storage.save([_rec(scope="/a"), _rec(scope="/b")])
storage.reset(scope_prefix="/a")
assert storage.count() == 1
# --- Dual-shard & sync ---
def test_flush_to_central(tmp_path: Path) -> None:
s = _make_storage(str(tmp_path / "edge"))
s.save([_rec(content="to sync")])
assert s._local_has_data
s.flush_to_central()
assert not s._local_has_data
assert not s._local_path.exists()
# Central should have the record.
assert s.count() == 1
def test_dual_shard_search(tmp_path: Path) -> None:
s = _make_storage(str(tmp_path / "edge"))
# Save and flush to central.
s.save([_rec(content="central record", scope="/a")])
s.flush_to_central()
# Save to local only.
s._closed = False # Reset for continued use.
s.save([_rec(content="local record", scope="/b")])
# Search should find both.
results = s.search([0.1, 0.2, 0.3, 0.4], limit=10)
assert len(results) == 2
contents = {r.content for r, _ in results}
assert "central record" in contents
assert "local record" in contents
def test_close_lifecycle(tmp_path: Path) -> None:
s = _make_storage(str(tmp_path / "edge"))
s.save([_rec(content="persisted")])
s.close()
# Reopen a new storage — should find the record in central.
s2 = _make_storage(str(tmp_path / "edge"))
results = s2.search([0.1, 0.2, 0.3, 0.4], limit=5)
assert len(results) == 1
assert results[0][0].content == "persisted"
s2.close()
def test_orphaned_shard_cleanup(tmp_path: Path) -> None:
base = tmp_path / "edge"
# Create a fake orphaned shard using a PID that doesn't exist.
fake_pid = 99999999
s1 = _make_storage(str(base))
# Manually create a shard at the orphaned path.
orphan_path = base / f"worker-{fake_pid}"
orphan_path.mkdir(parents=True, exist_ok=True)
from qdrant_edge import (
EdgeConfig,
EdgeShard,
EdgeVectorParams,
Distance,
Point,
UpdateOperation,
)
config = EdgeConfig(
vectors={"memory": EdgeVectorParams(size=4, distance=Distance.Cosine)}
)
orphan = EdgeShard.create(str(orphan_path), config)
orphan.update(
UpdateOperation.upsert_points([
Point(
id=12345,
vector={"memory": [0.5, 0.5, 0.5, 0.5]},
payload={
"record_id": "orphan-uuid",
"content": "orphaned",
"scope": "/",
"scope_ancestors": ["/"],
"categories": [],
"metadata": {},
"importance": 0.5,
"created_at": datetime.now(timezone.utc).replace(tzinfo=None).isoformat(),
"last_accessed": datetime.now(timezone.utc).replace(tzinfo=None).isoformat(),
"source": "",
"private": False,
},
)
])
)
orphan.flush()
orphan.close()
s1.close()
# Creating a new storage should detect and recover the orphaned shard.
s2 = _make_storage(str(base))
assert not orphan_path.exists()
# The orphaned record should now be in central.
results = s2.search([0.5, 0.5, 0.5, 0.5], limit=5)
assert len(results) >= 1
assert any(r.content == "orphaned" for r, _ in results)
s2.close()
# --- Integration with Memory class ---
def test_memory_with_qdrant_edge(tmp_path: Path) -> None:
from crewai.memory.unified_memory import Memory
mock_embedder = MagicMock()
mock_embedder.side_effect = lambda texts: [[0.1, 0.2, 0.3, 0.4] for _ in texts]
storage = _make_storage(str(tmp_path / "edge"))
m = Memory(
storage=storage,
llm=MagicMock(),
embedder=mock_embedder,
)
r = m.remember(
"We decided to use Qdrant Edge.",
scope="/project",
categories=["decision"],
importance=0.7,
)
assert r.content == "We decided to use Qdrant Edge."
matches = m.recall("Qdrant", scope="/project", limit=5, depth="shallow")
assert len(matches) >= 1
m.close()
def test_memory_string_storage_qdrant_edge(tmp_path: Path) -> None:
"""Test that storage='qdrant-edge' string instantiation works."""
import os
os.environ["CREWAI_STORAGE_DIR"] = str(tmp_path)
try:
from crewai.memory.unified_memory import Memory
mock_embedder = MagicMock()
mock_embedder.side_effect = lambda texts: [[0.1, 0.2, 0.3, 0.4] for _ in texts]
m = Memory(
storage="qdrant-edge",
llm=MagicMock(),
embedder=mock_embedder,
)
from crewai.memory.storage.qdrant_edge_storage import QdrantEdgeStorage
assert isinstance(m._storage, QdrantEdgeStorage)
m.close()
finally:
os.environ.pop("CREWAI_STORAGE_DIR", None)

View File

@@ -7,6 +7,7 @@ from crewai.events.listeners.tracing.first_time_trace_handler import (
FirstTimeTraceHandler,
)
from crewai.events.listeners.tracing.trace_batch_manager import (
TraceBatch,
TraceBatchManager,
)
from crewai.events.listeners.tracing.trace_listener import (
@@ -657,6 +658,16 @@ class TestTraceListenerSetup:
trace_listener.first_time_handler.collected_events = True
mock_batch_response = MagicMock()
mock_batch_response.status_code = 201
mock_batch_response.json.return_value = {
"trace_id": "mock-trace-id",
"ephemeral_trace_id": "mock-ephemeral-trace-id",
"access_code": "TRACE-mock",
}
mock_events_response = MagicMock()
mock_events_response.status_code = 200
with (
patch.object(
trace_listener.first_time_handler,
@@ -666,6 +677,40 @@ class TestTraceListenerSetup:
patch.object(
trace_listener.first_time_handler, "_display_ephemeral_trace_link"
) as mock_display_link,
patch.object(
trace_listener.batch_manager.plus_api,
"initialize_trace_batch",
return_value=mock_batch_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_batch_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"send_trace_events",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"send_ephemeral_trace_events",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"finalize_trace_batch",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"finalize_ephemeral_trace_batch",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager,
"_cleanup_batch_data",
),
):
crew.kickoff()
wait_for_event_handlers()
@@ -918,3 +963,676 @@ class TestTraceListenerSetup:
mock_init.assert_called_once()
payload = mock_init.call_args[0][0]
assert "user_identifier" not in payload
class TestTraceBatchIdClearedOnFailure:
"""Tests: trace_batch_id is cleared when _initialize_backend_batch fails."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id (simulating first-time user)."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id # simulate line 96
bm.is_current_batch_ephemeral = True
return bm
def test_trace_batch_id_cleared_on_exception(self):
"""trace_batch_id must be None when the API call raises an exception."""
bm = self._make_batch_manager()
assert bm.trace_batch_id is not None
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=ConnectionError("network down"),
),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
def test_trace_batch_id_set_on_success(self):
"""trace_batch_id must be set from the server response on success."""
bm = self._make_batch_manager()
server_id = "server-ephemeral-trace-id-999"
mock_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_response,
),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
def test_send_events_skipped_when_trace_batch_id_none(self):
"""_send_events_to_backend must return early when trace_batch_id is None."""
bm = self._make_batch_manager()
bm.trace_batch_id = None
bm.event_buffer = [MagicMock()] # has events
with patch.object(
bm.plus_api, "send_ephemeral_trace_events"
) as mock_send:
result = bm._send_events_to_backend()
assert result == 500
mock_send.assert_not_called()
class TestInitializeBackendBatchRetry:
"""Tests for retry logic in _initialize_backend_batch."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
return bm
def test_retries_on_none_response_then_succeeds(self):
"""Retries when API returns None, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-retry"
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[None, success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
mock_sleep.assert_called_once_with(0.2)
def test_retries_on_5xx_then_succeeds(self):
"""Retries on 500 server error, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-5xx"
error_response = MagicMock(status_code=500, text="Internal Server Error")
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[error_response, success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
def test_no_retry_on_exception(self):
"""Exceptions (e.g. timeout, connection error) abort immediately without retry."""
bm = self._make_batch_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=ConnectionError("network down"),
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 1
mock_sleep.assert_not_called()
def test_no_retry_on_4xx(self):
"""Does NOT retry on 422 — client error is not transient."""
bm = self._make_batch_manager()
error_response = MagicMock(status_code=422, text="Unprocessable Entity")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=error_response,
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 1
mock_sleep.assert_not_called()
def test_exhausts_retries_then_clears_batch_id(self):
"""After all retries fail, trace_batch_id is None."""
bm = self._make_batch_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=None,
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 2 # initial + 1 retry
class TestFirstTimeHandlerBackendInitGuard:
"""Tests: backend_initialized gated on actual batch creation success."""
def _make_handler_with_manager(self):
"""Create a FirstTimeTraceHandler wired to a TraceBatchManager."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
handler = FirstTimeTraceHandler()
handler.is_first_time = True
handler.collected_events = True
handler.batch_manager = bm
return handler, bm
def test_backend_initialized_true_on_success(self):
"""Events are sent when batch creation succeeds, then state is cleaned up."""
handler, bm = self._make_handler_with_manager()
server_id = "server-id-abc"
mock_init_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
mock_send_response = MagicMock(status_code=200)
trace_batch_id_during_send = None
def capture_send(*args, **kwargs):
nonlocal trace_batch_id_during_send
trace_batch_id_during_send = bm.trace_batch_id
return mock_send_response
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_init_response,
),
patch.object(
bm.plus_api,
"send_ephemeral_trace_events",
side_effect=capture_send,
),
patch.object(bm, "_finalize_backend_batch"),
):
bm.event_buffer = [MagicMock(to_dict=MagicMock(return_value={}))]
handler._initialize_backend_and_send_events()
# trace_batch_id was set correctly during send
assert trace_batch_id_during_send == server_id
# State cleaned up after completion (singleton reuse)
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
assert bm.current_batch is None
def test_backend_initialized_false_on_failure(self):
"""backend_initialized stays False and events are NOT sent when batch creation fails."""
handler, bm = self._make_handler_with_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=None, # server call fails
),
patch.object(bm, "_send_events_to_backend") as mock_send,
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
patch.object(handler, "_gracefully_fail") as mock_fail,
):
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
mock_send.assert_not_called()
mock_finalize.assert_not_called()
mock_fail.assert_called_once()
def test_backend_initialized_false_on_non_2xx(self):
"""backend_initialized stays False when server returns non-2xx."""
handler, bm = self._make_handler_with_manager()
mock_response = MagicMock(status_code=500, text="Internal Server Error")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_response,
),
patch.object(bm, "_send_events_to_backend") as mock_send,
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
patch.object(handler, "_gracefully_fail") as mock_fail,
):
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
mock_send.assert_not_called()
mock_finalize.assert_not_called()
mock_fail.assert_called_once()
class TestFirstTimeHandlerAlwaysEphemeral:
"""Tests that first-time handler always uses ephemeral with skip_context_check."""
def _make_handler_with_manager(self):
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
handler = FirstTimeTraceHandler()
handler.is_first_time = True
handler.collected_events = True
handler.batch_manager = bm
return handler, bm
def test_deferred_init_uses_ephemeral_and_skip_context_check(self):
"""Deferred backend init always uses ephemeral=True and skip_context_check=True."""
handler, bm = self._make_handler_with_manager()
with (
patch.object(bm, "_initialize_backend_batch") as mock_init,
patch.object(bm, "_send_events_to_backend"),
patch.object(bm, "_finalize_backend_batch"),
):
mock_init.side_effect = lambda **kwargs: None
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
mock_init.assert_called_once()
assert mock_init.call_args.kwargs["use_ephemeral"] is True
assert mock_init.call_args.kwargs["skip_context_check"] is True
class TestAuthFailbackToEphemeral:
"""Tests for ephemeral fallback when server rejects auth (401/403)."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = False # authenticated path
return bm
def test_401_non_ephemeral_falls_back_to_ephemeral(self):
"""A 401 on the non-ephemeral endpoint should retry as ephemeral."""
bm = self._make_batch_manager()
server_id = "ephemeral-fallback-id"
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
ephemeral_success = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=auth_rejected,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_success,
) as mock_ephemeral,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id == server_id
assert bm.is_current_batch_ephemeral is True
mock_ephemeral.assert_called_once()
def test_403_non_ephemeral_falls_back_to_ephemeral(self):
"""A 403 on the non-ephemeral endpoint should also fall back."""
bm = self._make_batch_manager()
server_id = "ephemeral-fallback-403"
forbidden = MagicMock(status_code=403, text="Forbidden")
ephemeral_success = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=forbidden,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_success,
),
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id == server_id
assert bm.is_current_batch_ephemeral is True
def test_401_on_ephemeral_does_not_recurse(self):
"""A 401 on the ephemeral endpoint should NOT try to fall back again."""
bm = self._make_batch_manager()
bm.is_current_batch_ephemeral = True
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=auth_rejected,
) as mock_ephemeral,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
# Called only once — no recursive fallback
mock_ephemeral.assert_called()
def test_401_fallback_ephemeral_also_fails(self):
"""If ephemeral fallback also fails, trace_batch_id is cleared."""
bm = self._make_batch_manager()
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
ephemeral_fail = MagicMock(status_code=422, text="Validation failed")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=auth_rejected,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_fail,
),
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id is None
class TestMarkBatchAsFailedRouting:
"""Tests: _mark_batch_as_failed routes to the correct endpoint."""
def _make_batch_manager(self, ephemeral: bool = False):
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.is_current_batch_ephemeral = ephemeral
return bm
def test_routes_to_ephemeral_endpoint_when_ephemeral(self):
"""Ephemeral batches must use mark_ephemeral_trace_batch_as_failed."""
bm = self._make_batch_manager(ephemeral=True)
with patch.object(
bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
) as mock_ephemeral, patch.object(
bm.plus_api, "mark_trace_batch_as_failed"
) as mock_non_ephemeral:
bm._mark_batch_as_failed("batch-123", "some error")
mock_ephemeral.assert_called_once_with("batch-123", "some error")
mock_non_ephemeral.assert_not_called()
def test_routes_to_non_ephemeral_endpoint_when_not_ephemeral(self):
"""Non-ephemeral batches must use mark_trace_batch_as_failed."""
bm = self._make_batch_manager(ephemeral=False)
with patch.object(
bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
) as mock_ephemeral, patch.object(
bm.plus_api, "mark_trace_batch_as_failed"
) as mock_non_ephemeral:
bm._mark_batch_as_failed("batch-456", "another error")
mock_non_ephemeral.assert_called_once_with("batch-456", "another error")
mock_ephemeral.assert_not_called()
class TestBackendInitializedGatedOnSuccess:
"""Tests: backend_initialized reflects actual init success on non-first-time path."""
def test_backend_initialized_true_on_success(self):
"""backend_initialized is True when _initialize_backend_batch succeeds."""
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch(
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
return_value=False,
),
patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
),
):
bm = TraceBatchManager()
mock_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"trace_id": "server-id"}),
)
with patch.object(
bm.plus_api, "initialize_trace_batch", return_value=mock_response
):
bm.initialize_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
)
assert bm.backend_initialized is True
assert bm.trace_batch_id == "server-id"
def test_backend_initialized_false_on_failure(self):
"""backend_initialized is False when _initialize_backend_batch fails."""
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch(
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
return_value=False,
),
patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
),
):
bm = TraceBatchManager()
with patch.object(
bm.plus_api, "initialize_trace_batch", return_value=None
):
bm.initialize_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
)
assert bm.backend_initialized is False
assert bm.trace_batch_id is None

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.12.0a1"
__version__ = "1.12.0a3"

27
uv.lock generated
View File

@@ -1205,6 +1205,9 @@ pandas = [
qdrant = [
{ name = "qdrant-client", extra = ["fastembed"] },
]
qdrant-edge = [
{ name = "qdrant-edge-py" },
]
tools = [
{ name = "crewai-tools" },
]
@@ -1259,6 +1262,7 @@ requires-dist = [
{ name = "python-dotenv", specifier = "~=1.1.1" },
{ name = "pyyaml", specifier = "~=6.0" },
{ name = "qdrant-client", extras = ["fastembed"], marker = "extra == 'qdrant'", specifier = "~=1.14.3" },
{ name = "qdrant-edge-py", marker = "extra == 'qdrant-edge'", specifier = ">=0.6.0" },
{ name = "regex", specifier = "~=2026.1.15" },
{ name = "textual", specifier = ">=7.5.0" },
{ name = "tiktoken", marker = "extra == 'embeddings'", specifier = "~=0.8.0" },
@@ -1268,7 +1272,7 @@ requires-dist = [
{ name = "uv", specifier = "~=0.9.13" },
{ name = "voyageai", marker = "extra == 'voyageai'", specifier = "~=0.3.5" },
]
provides-extras = ["a2a", "anthropic", "aws", "azure-ai-inference", "bedrock", "docling", "embeddings", "file-processing", "google-genai", "litellm", "mem0", "openpyxl", "pandas", "qdrant", "tools", "voyageai", "watson"]
provides-extras = ["a2a", "anthropic", "aws", "azure-ai-inference", "bedrock", "docling", "embeddings", "file-processing", "google-genai", "litellm", "mem0", "openpyxl", "pandas", "qdrant", "qdrant-edge", "tools", "voyageai", "watson"]
[[package]]
name = "crewai-devtools"
@@ -6613,6 +6617,27 @@ fastembed = [
{ name = "fastembed", version = "0.7.4", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.13'" },
]
[[package]]
name = "qdrant-edge-py"
version = "0.6.0"
source = { registry = "https://pypi.org/simple" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1c/72/fce3df4e4b8882b5b00ab3d0a574bbeee2d39a8e520ccf246f456effd185/qdrant_edge_py-0.6.0-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:c9d463e7fa81541d60ab8671e6e92a9afd8c4a0e2cfb7e13ea8f5d76e70b877a", size = 9728290, upload-time = "2026-03-19T21:16:15.03Z" },
{ url = "https://files.pythonhosted.org/packages/41/99/70f4e87f7f2ef68c5f92104b914c0e756c22b4bd19957de30a213dadff22/qdrant_edge_py-0.6.0-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:a18b0bf0355260466bb8d453f2cedc7a9e4f6a2e9d9c58489b859150a3c7e0a6", size = 9203390, upload-time = "2026-03-19T21:16:17.255Z" },
{ url = "https://files.pythonhosted.org/packages/80/55/998ea744a4cef59c69e86b7b2b57ca2f2d4b0f86c212c7b43dd90cc6360e/qdrant_edge_py-0.6.0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cda53f31d8693d090ec564e6761037f57af6f342ac2eef82e1c160c00d80f331", size = 10287388, upload-time = "2026-03-19T21:16:19.215Z" },
{ url = "https://files.pythonhosted.org/packages/40/d2/9e24a9c57699fe6df9a4f3b6cd0d4c3c9f0bfdbd502a28d25fdfadd44ab5/qdrant_edge_py-0.6.0-cp310-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:80c5e8f8cf650e422a3d313e394bde2760c6206914cd9d6142c9c5e730a76639", size = 9752632, upload-time = "2026-03-19T21:16:21.409Z" },
{ url = "https://files.pythonhosted.org/packages/0c/3c/a01840efcae392e5a376a483b9a19705ed0f5bc030befbe3d25b58a6d3d4/qdrant_edge_py-0.6.0-cp310-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:d2ab0d209f693fd0d5225072441ed47eccee4f7044470a293c54a3ffdf963cfc", size = 10287245, upload-time = "2026-03-19T21:16:24.366Z" },
{ url = "https://files.pythonhosted.org/packages/7a/45/a3ec5e7d36c5dd4510e4f90d0adaf6aa3e66cff35884ff3edefce240fd77/qdrant_edge_py-0.6.0-cp310-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:9abd0c3aedfed380d4c4a82626004b746bd05cb6a8e28e1b2fe7467726dc8840", size = 9935881, upload-time = "2026-03-19T21:16:26.384Z" },
{ url = "https://files.pythonhosted.org/packages/66/0d/43c9033fbb12f0858d5af73b842acb02b3208fe1a31882def2ef23fd560c/qdrant_edge_py-0.6.0-cp310-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:ea51a917fc1b927d799d60e166337b6837ee3da39c23d4dc736b82b67497ff12", size = 10507046, upload-time = "2026-03-19T21:16:28.536Z" },
{ url = "https://files.pythonhosted.org/packages/73/33/b2ead1c51a59d31d19418e6d6ca8ea3ce0f32f76efdd48248a1a3791357f/qdrant_edge_py-0.6.0-cp310-abi3-win_amd64.whl", hash = "sha256:d8376e30b53fbb5d9ac8b0aea683173096d7a775b351110aee4337460c906e71", size = 9905482, upload-time = "2026-03-19T21:16:30.555Z" },
{ url = "https://files.pythonhosted.org/packages/09/be/a054ac8902e942b0d44e27e8c0e4d3593a34bb143726aa3d9bebd215e7f7/qdrant_edge_py-0.6.0-pp311-pypy311_pp73-macosx_10_12_x86_64.whl", hash = "sha256:6e94804d9aa0c973fe25c83aec16da8c0f9e6a955a0cb1668bd972e1ca4b5604", size = 9724896, upload-time = "2026-03-19T21:16:32.793Z" },
{ url = "https://files.pythonhosted.org/packages/19/30/285eed25d8bab071b9867937b1e0fdc002c0c1180ff43476e5044029e73c/qdrant_edge_py-0.6.0-pp311-pypy311_pp73-macosx_11_0_arm64.whl", hash = "sha256:2ca40da1fa22ff4fd05e669d76c1087d3354486bcb685e9b07b1ca0ab5ef6b97", size = 9199009, upload-time = "2026-03-19T21:16:34.954Z" },
{ url = "https://files.pythonhosted.org/packages/41/d7/b729bbd887476a0a3040fc95d2548e519601d69b2f9d7ece83daf7958372/qdrant_edge_py-0.6.0-pp311-pypy311_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:12fde5356eeb83ce8031a339ca73ea0a1a9b98927843f5bf7fa5c0412ca5ff79", size = 10279079, upload-time = "2026-03-19T21:16:36.876Z" },
{ url = "https://files.pythonhosted.org/packages/74/2e/68ef2346b6971b8b4d6b479099618dc2879d8c2e357065f8910aeb8b6ed5/qdrant_edge_py-0.6.0-pp311-pypy311_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:c110af3ddbd4a5dae0421457e4a6f1f83c24411ea1187d557367ef5499cb6bef", size = 9746991, upload-time = "2026-03-19T21:16:38.968Z" },
{ url = "https://files.pythonhosted.org/packages/cd/46/3bfcc5e13d1a7d110a2d1ecf86c63a781e71e543712232be59d7a3f34e96/qdrant_edge_py-0.6.0-pp311-pypy311_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:839651466c217bb8f684a3a0b9ad0726c670fcc734b552eef3ad76fbb4f5a12b", size = 10282664, upload-time = "2026-03-19T21:16:40.952Z" },
{ url = "https://files.pythonhosted.org/packages/80/54/7ba6bbaa2b53a188b0a43a6c063007e9a58afa3e35326f63518efbc6f5e8/qdrant_edge_py-0.6.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:c7665230dc4a2412412765fbdf9053e32b32f4c60579881ed68140b4d0ba6915", size = 9901015, upload-time = "2026-03-19T21:16:43.407Z" },
]
[[package]]
name = "questionary"
version = "2.1.1"