Compare commits

..

13 Commits

Author SHA1 Message Date
Tiago Freire
81a8c2efc1 fix: prevent zeroed data in local trace message and ensure cleanup on all paths
- Read execution start time non-destructively before _finalize_backend_batch
    consumes it, so the server receives the real duration and the local
    fallback message also shows the correct value
  - Pass pre-captured events_count, duration_ms, and batch_id to
    _show_local_trace_message instead of reading from batch_manager
    (buffer cleared by send, duration consumed by finalize)
  - Extract _reset_batch_state to reset all singleton state (current_batch,
    event_buffer, trace_batch_id, is_current_batch_ephemeral,
    backend_initialized, batch_owner_type/id) and call it in every exit
    path: success, init failure, send failure, and exception handler
2026-03-18 22:50:48 -03:00
Tiago Freire
4859dc66a5 fix: address PR review findings — forward skip_context_check in
recursive fallback, reduce retry backoff, clean up batch state

  - Forward skip_context_check parameter in the 401/403 ephemeral
    fallback recursive call to prevent silent early return when
    is_tracing_enabled_in_context() is False
  - Reduce retry backoff from 1s to 200ms to minimize lock hold time
    on the non-first-time path (worst case 400ms vs 2s)
  - Add batch state cleanup after _finalize_backend_batch in the
    first-time handler, mirroring finalize_batch: reset current_batch,
    event_buffer, trace_batch_id, is_current_batch_ephemeral,
    batch_owner_type, batch_owner_id, and call _cleanup_batch_data()
2026-03-18 21:29:35 -03:00
Tiago Freire
7f2eda2ac6 refactor: remove redundant tests in TestTraceBatchIdClearedOnFailure
Remove test_trace_batch_id_cleared_on_none_response (covered by
  TestInitializeBackendBatchRetry::test_exhausts_retries_then_clears_batch_id)
  and test_trace_batch_id_cleared_on_non_2xx_response (covered by
  TestInitializeBackendBatchRetry::test_no_retry_on_4xx).
2026-03-18 20:46:20 -03:00
Tiago Freire
6734db4f71 fix: mark batch as failed when event send fails in first-time handler
The return value of _send_events_to_backend() was discarded in
  _initialize_backend_and_send_events, so _finalize_backend_batch was
  called unconditionally with the full event count even when the send
  returned 500. This finalized the batch as "completed" on the server
  while it received 0 events, producing an empty trace URL.

  Now check the return status and call mark_trace_batch_as_failed on
  500, matching the behavior of the regular finalize_batch path.
2026-03-18 20:35:57 -03:00
Tiago Freire
c00544c97d fix: always use ephemeral tracing for first-time users
The first-time handler UX is built around ephemeral traces (access
  code, 24hr expiry link, browser open). Checking auth and creating
  non-ephemeral batches caused the handler to fall through to the
  local traces fallback since ephemeral_trace_url is only set for
  ephemeral batches. The server's LinkEphemeralTracesJob links
  ephemeral traces to user accounts on signup regardless.

  Remove auth check from first-time handler and always pass
  use_ephemeral=True to _initialize_backend_batch.
2026-03-18 19:54:10 -03:00
Tiago Freire
08c533905e fix: bypass is_tracing_enabled_in_context for first-time deferred batch init
First-time users have is_tracing_enabled_in_context() = False by design
  (it's a prerequisite for should_auto_collect_first_time_traces). This
  caused _initialize_backend_batch to return early without creating the
  batch, and _send_events_to_backend to send to a non-existent batch.

  Add skip_context_check parameter to _initialize_backend_batch so the
  first-time handler can bypass the guard during deferred init. Gate
  backend_initialized on trace_batch_id being set. Call
  _finalize_backend_batch directly instead of finalize_batch (which has
  the same context guard). Sync is_current_batch_ephemeral on success
  to prevent endpoint mismatch between batch creation and event send.
2026-03-18 19:54:10 -03:00
Tiago Freire
6d1546c381 feat: fall back to ephemeral tracing on server auth rejection
When the non-ephemeral batch endpoint returns 401 or 403 (expired
  token, revoked credentials, JWKS rotation), _initialize_backend_batch
  now switches is_current_batch_ephemeral to True and retries via the
  ephemeral endpoint. This preserves traces that would otherwise be
  lost due to the timing gap between client-side token validation and
  server-side JWT decode.

  The fallback only triggers on the non-ephemeral path to prevent
  infinite recursion. If the ephemeral attempt also fails, trace_batch_id
  is cleared normally.

  Addresses the 2M+ failed push attempts in which valid client-side
  tokens were rejected on the server.
2026-03-18 19:54:10 -03:00
Tiago Freire
bcba620a41 fix: respect authentication status for first-time users
Previously, _initialize_batch forced use_ephemeral=True for all
  first-time users, bypassing _check_authenticated() entirely. This
  meant logged-in users in a new project directory were routed to the
  ephemeral endpoint instead of their account's tracing endpoint.

  Now _check_authenticated() runs for all users including first-time.
  Authenticated first-time users get non-ephemeral tracing (traces
  linked to their account); only unauthenticated first-time users
  fall back to ephemeral. The deferred backend init in
  FirstTimeTraceHandler also reads is_current_batch_ephemeral instead
  of hardcoding use_ephemeral=True.
2026-03-18 19:54:10 -03:00
Tiago Freire
4141233a78 feat: add retry logic for ephemeral trace batch creation
Transient failures (None response, 5xx, network errors) during
  _initialize_backend_batch now retry up to 2 times with a 1s backoff.
  Non-transient 4xx errors (422 validation, 401 auth) are not retried
  since the same payload would fail again. If all retries are exhausted,
  trace_batch_id is cleared per the existing safety net.

  This runs post-execution when the user has already answered "y" to
  view traces, so the ~2s worst-case delay is acceptable.
2026-03-18 19:54:10 -03:00
Tiago Freire
c67425d323 fix: gate backend_initialized on actual batch creation success
In first_time_trace_handler._initialize_backend_and_send_events,
  backend_initialized was set to True unconditionally after calling
  _initialize_backend_batch, regardless of whether the server-side
  batch was actually created. This caused _send_events_to_backend and
  finalize_batch to run against a non-existent batch.

  Now check trace_batch_id after _initialize_backend_batch returns;
  if None (batch creation failed), call _gracefully_fail and return
  early, skipping event send and finalization.
2026-03-18 19:54:10 -03:00
Tiago Freire
7f090c664e fix: clear trace_batch_id on backend batch initialization failure
When _initialize_backend_batch fails (None response, non-2xx status,
  or exception), trace_batch_id was left populated with a client-generated
  UUID that was never registered server-side. Subsequent calls to
  _send_events_to_backend would see the stale ID and POST events to
  /ephemeral/batches/{id}/events, resulting in a 404 from the server.

  Nullify trace_batch_id on all three failure paths so downstream methods
  skip event sending instead of hitting a non-existent batch.
2026-03-18 19:54:10 -03:00
Vini Brasil
6b262f5a6d Fix lock_store crash when redis package is not installed (#4943)
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
* Fix lock_store crash when redis package is not installed

`REDIS_URL` being set was enough to trigger a Redis lock, which would
raise `ImportError` if the `redis` package wasn't available. Added
`_redis_available()` to guard on both the env var and the import.

* Simplify tests

* Simplify tests #2
2026-03-18 15:05:41 -03:00
dependabot[bot]
6a6adaf2da chore(deps): bump pyasn1 (#4925)
Some checks failed
Build uv cache / build-cache (3.10) (push) Waiting to run
Build uv cache / build-cache (3.11) (push) Waiting to run
Build uv cache / build-cache (3.12) (push) Waiting to run
Build uv cache / build-cache (3.13) (push) Waiting to run
Check Documentation Broken Links / Check broken links (push) Waiting to run
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Bumps the security-updates group with 1 update in the / directory: [pyasn1](https://github.com/pyasn1/pyasn1).


Updates `pyasn1` from 0.6.2 to 0.6.3
- [Release notes](https://github.com/pyasn1/pyasn1/releases)
- [Changelog](https://github.com/pyasn1/pyasn1/blob/main/CHANGES.rst)
- [Commits](https://github.com/pyasn1/pyasn1/compare/v0.6.2...v0.6.3)

---
updated-dependencies:
- dependency-name: pyasn1
  dependency-version: 0.6.3
  dependency-type: indirect
  dependency-group: security-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-18 12:16:59 -05:00
14 changed files with 863 additions and 34 deletions

View File

@@ -9,7 +9,10 @@ mode: "wide"
The Tool Repository is a package manager for CrewAI tools. It allows users to publish, install, and manage tools that integrate with CrewAI crews and flows.
All tools are private by default and accessible only within your organization.
Tools can be:
- **Private**: accessible only within your organization (default)
- **Public**: accessible to all CrewAI users if published with the `--public` flag
The repository is not a version control system. Use Git to track code changes and enable collaboration.
@@ -103,6 +106,12 @@ To publish the tool:
crewai tool publish
```
By default, tools are published as private. To make a tool public:
```bash
crewai tool publish --public
```
For more details on how to build tools, see [Creating your own tools](/en/concepts/tools#creating-your-own-tools).
## Updating Tools

View File

@@ -9,7 +9,10 @@ mode: "wide"
Tool Repository는 CrewAI 도구를 위한 패키지 관리자입니다. 사용자는 CrewAI crew와 flow에 통합되는 도구를 게시, 설치 및 관리할 수 있습니다.
모든 도구는 기본적으로 비공개이며 조직 내에서만 접근할 수 있습니다.
도구는 다음과 같이 분류됩니다:
- **비공개**: 조직 내에서만 접근할 수 있습니다(기본값)
- **공개**: `--public` 플래그로 게시하면 모든 CrewAI 사용자가 접근할 수 있습니다
이 저장소는 버전 관리 시스템이 아닙니다. 코드 변경 사항을 추적하고 협업을 활성화하려면 Git을 사용하십시오.
@@ -57,6 +60,12 @@ git commit -m "Initial version"
crewai tool publish
```
기본적으로 도구는 비공개로 게시됩니다. 도구를 공개로 설정하려면:
```bash
crewai tool publish --public
```
도구 빌드에 대한 자세한 내용은 [나만의 도구 만들기](/ko/concepts/tools#creating-your-own-tools)를 참고하세요.
## 도구 업데이트

View File

@@ -9,7 +9,10 @@ mode: "wide"
O Repositório de Ferramentas é um gerenciador de pacotes para ferramentas da CrewAI. Ele permite que usuários publiquem, instalem e gerenciem ferramentas que se integram com crews e flows da CrewAI.
Todas as ferramentas são privadas por padrão e acessíveis apenas dentro da sua organização.
As ferramentas podem ser:
- **Privadas**: acessíveis apenas dentro da sua organização (padrão)
- **Públicas**: acessíveis a todos os usuários CrewAI se publicadas com a flag `--public`
O repositório não é um sistema de controle de versões. Use Git para rastrear mudanças no código e permitir colaboração.
@@ -57,6 +60,12 @@ Para publicar a ferramenta:
crewai tool publish
```
Por padrão, as ferramentas são publicadas como privadas. Para tornar uma ferramenta pública:
```bash
crewai tool publish --public
```
Para mais detalhes sobre como construir ferramentas, acesse [Criando suas próprias ferramentas](/pt-BR/concepts/tools#creating-your-own-tools).
## Atualizando ferramentas

View File

@@ -452,10 +452,12 @@ def tool_install(handle: str):
default=False,
help="Bypasses Git remote validations",
)
def tool_publish(force: bool):
@click.option("--public", "is_public", flag_value=True, default=False)
@click.option("--private", "is_public", flag_value=False)
def tool_publish(is_public: bool, force: bool):
tool_cmd = ToolCommand()
tool_cmd.login()
tool_cmd.publish(force)
tool_cmd.publish(is_public, force)
@crewai.group()

View File

@@ -68,6 +68,7 @@ class PlusAPI:
def publish_tool(
self,
handle: str,
is_public: bool,
version: str,
description: str | None,
encoded_file: str,
@@ -75,6 +76,7 @@ class PlusAPI:
) -> httpx.Response:
params = {
"handle": handle,
"public": is_public,
"version": version,
"file": encoded_file,
"description": description,

View File

@@ -73,7 +73,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
finally:
os.chdir(old_directory)
def publish(self, force: bool = False) -> None:
def publish(self, is_public: bool, force: bool = False) -> None:
if not git.Repository().is_synced() and not force:
console.print(
"[bold red]Failed to publish tool.[/bold red]\n"
@@ -129,6 +129,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
console.print("[bold blue]Publishing tool to repository...[/bold blue]")
publish_response = self.plus_api_client.publish_tool(
handle=project_name,
is_public=is_public,
version=project_version,
description=project_description,
encoded_file=f"data:application/x-gzip;base64,{encoded_tarball}",

View File

@@ -1,6 +1,7 @@
import logging
import uuid
import webbrowser
from datetime import datetime, timezone
from rich.console import Console
from rich.panel import Panel
@@ -100,20 +101,48 @@ class FirstTimeTraceHandler:
user_context=user_context,
execution_metadata=execution_metadata,
use_ephemeral=True,
skip_context_check=True,
)
if not self.batch_manager.trace_batch_id:
self._gracefully_fail("Backend batch creation failed, cannot send events.")
self._reset_batch_state()
return
self.batch_manager.backend_initialized = True
if self.batch_manager.event_buffer:
self.batch_manager._send_events_to_backend()
# Capture values before send/finalize consume them
events_count = len(self.batch_manager.event_buffer)
batch_id = self.batch_manager.trace_batch_id
# Read duration non-destructively — _finalize_backend_batch will consume it
start_time = self.batch_manager.execution_start_times.get("execution")
duration_ms = (
int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
if start_time
else 0
)
self.batch_manager.finalize_batch()
if self.batch_manager.event_buffer:
send_status = self.batch_manager._send_events_to_backend()
if send_status == 500 and self.batch_manager.trace_batch_id:
self.batch_manager.plus_api.mark_trace_batch_as_failed(
self.batch_manager.trace_batch_id,
"Error sending events to backend",
)
self._reset_batch_state()
return
self.batch_manager._finalize_backend_batch(events_count)
self.ephemeral_url = self.batch_manager.ephemeral_trace_url
if not self.ephemeral_url:
self._show_local_trace_message()
self._show_local_trace_message(events_count, duration_ms, batch_id)
self._reset_batch_state()
except Exception as e:
self._gracefully_fail(f"Backend initialization failed: {e}")
self._reset_batch_state()
def _display_ephemeral_trace_link(self):
"""Display the ephemeral trace link to the user and automatically open browser."""
@@ -184,6 +213,19 @@ To enable tracing later, do any one of these:
console.print(panel)
console.print()
def _reset_batch_state(self):
"""Reset batch manager state to allow future executions to re-initialize."""
if not self.batch_manager:
return
self.batch_manager.batch_owner_type = None
self.batch_manager.batch_owner_id = None
self.batch_manager.current_batch = None
self.batch_manager.event_buffer.clear()
self.batch_manager.trace_batch_id = None
self.batch_manager.is_current_batch_ephemeral = False
self.batch_manager.backend_initialized = False
self.batch_manager._cleanup_batch_data()
def _gracefully_fail(self, error_message: str):
"""Handle errors gracefully without disrupting user experience."""
console = Console()
@@ -191,7 +233,7 @@ To enable tracing later, do any one of these:
logger.debug(f"First-time trace error: {error_message}")
def _show_local_trace_message(self):
def _show_local_trace_message(self, events_count: int = 0, duration_ms: int = 0, batch_id: str | None = None):
"""Show message when traces were collected locally but couldn't be uploaded."""
console = Console()
@@ -199,9 +241,9 @@ To enable tracing later, do any one of these:
📊 Your execution traces were collected locally!
Unfortunately, we couldn't upload them to the server right now, but here's what we captured:
{len(self.batch_manager.event_buffer)} trace events
• Execution duration: {self.batch_manager.calculate_duration("execution")}ms
• Batch ID: {self.batch_manager.trace_batch_id}
{events_count} trace events
• Execution duration: {duration_ms}ms
• Batch ID: {batch_id}
✅ Tracing has been enabled for future runs!
Your preference has been saved. Future Crew/Flow executions will automatically collect traces.

View File

@@ -1,3 +1,4 @@
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from logging import getLogger
@@ -108,10 +109,11 @@ class TraceBatchManager:
user_context: dict[str, str],
execution_metadata: dict[str, Any],
use_ephemeral: bool = False,
skip_context_check: bool = False,
) -> None:
"""Send batch initialization to backend"""
if not is_tracing_enabled_in_context():
if not skip_context_check and not is_tracing_enabled_in_context():
return
if not self.plus_api or not self.current_batch:
@@ -142,19 +144,62 @@ class TraceBatchManager:
payload["ephemeral_trace_id"] = self.current_batch.batch_id
payload["user_identifier"] = get_user_id()
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
max_retries = 2
response = None
last_exception = None
for attempt in range(max_retries + 1):
try:
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
if response is not None and response.status_code < 500:
break
if attempt < max_retries:
logger.debug(
f"Trace batch init attempt {attempt + 1} failed "
f"(status={response.status_code if response else 'None'}), retrying..."
)
time.sleep(0.2)
except Exception as e:
last_exception = e
if attempt < max_retries:
logger.debug(
f"Trace batch init attempt {attempt + 1} raised {type(e).__name__}, retrying..."
)
time.sleep(0.2)
if last_exception and response is None:
logger.warning(
f"Error initializing trace batch: {last_exception}. Continuing without tracing."
)
self.trace_batch_id = None
return
if response is None:
logger.warning(
"Trace batch initialization failed gracefully. Continuing without tracing."
)
self.trace_batch_id = None
return
# Fall back to ephemeral on auth failure (expired/revoked token)
if response.status_code in [401, 403] and not use_ephemeral:
logger.warning(
"Auth rejected by server, falling back to ephemeral tracing."
)
self.is_current_batch_ephemeral = True
return self._initialize_backend_batch(
user_context,
execution_metadata,
use_ephemeral=True,
skip_context_check=skip_context_check,
)
if response.status_code in [201, 200]:
self.is_current_batch_ephemeral = use_ephemeral
response_data = response.json()
self.trace_batch_id = (
response_data["trace_id"]
@@ -165,11 +210,13 @@ class TraceBatchManager:
logger.warning(
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
)
self.trace_batch_id = None
except Exception as e:
logger.warning(
f"Error initializing trace batch: {e}. Continuing without tracing."
)
self.trace_batch_id = None
def begin_event_processing(self) -> None:
"""Mark that an event handler started processing (for synchronization)."""

View File

@@ -1,7 +1,7 @@
"""Centralised lock factory.
If ``REDIS_URL`` is set, locks are distributed via ``portalocker.RedisLock``. Otherwise, falls
back to the standard ``portalocker.Lock``.
If ``REDIS_URL`` is set and the ``redis`` package is installed, locks are distributed via
``portalocker.RedisLock``. Otherwise, falls back to the standard ``portalocker.Lock``.
"""
from __future__ import annotations
@@ -30,6 +30,18 @@ _REDIS_URL: str | None = os.environ.get("REDIS_URL")
_DEFAULT_TIMEOUT: Final[int] = 120
def _redis_available() -> bool:
"""Return True if redis is installed and REDIS_URL is set."""
if not _REDIS_URL:
return False
try:
import redis # noqa: F401
return True
except ImportError:
return False
@lru_cache(maxsize=1)
def _redis_connection() -> redis.Redis:
"""Return a cached Redis connection, creating one on first call."""
@@ -51,7 +63,7 @@ def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]:
"""
channel = f"crewai:{md5(name.encode(), usedforsecurity=False).hexdigest()}"
if _REDIS_URL:
if _redis_available():
with portalocker.RedisLock(
channel=channel,
connection=_redis_connection(),

View File

@@ -120,16 +120,18 @@ class TestPlusAPI(unittest.TestCase):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
handle = "test_tool_handle"
public = True
version = "1.0.0"
description = "Test tool description"
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, version, description, encoded_file
handle, public, version, description, encoded_file
)
params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
@@ -155,16 +157,18 @@ class TestPlusAPI(unittest.TestCase):
mock_client_class.return_value.__enter__.return_value = mock_client_instance
handle = "test_tool_handle"
public = True
version = "1.0.0"
description = "Test tool description"
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, version, description, encoded_file
handle, public, version, description, encoded_file
)
expected_params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
@@ -181,16 +185,18 @@ class TestPlusAPI(unittest.TestCase):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
handle = "test_tool_handle"
public = False
version = "2.0.0"
description = None
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, version, description, encoded_file
handle, public, version, description, encoded_file
)
params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,

View File

@@ -163,7 +163,7 @@ def test_install_api_error(mock_get, capsys, tool_command):
@patch("crewai.cli.tools.main.git.Repository.is_synced", return_value=False)
def test_publish_when_not_in_sync(mock_is_synced, capsys, tool_command):
with raises(SystemExit):
tool_command.publish()
tool_command.publish(is_public=True)
output = capsys.readouterr().out
assert "Local changes need to be resolved before publishing" in output
@@ -204,7 +204,7 @@ def test_publish_when_not_in_sync_and_force(
mock_publish_response.json.return_value = {"handle": "sample-tool"}
mock_publish.return_value = mock_publish_response
tool_command.publish(force=True)
tool_command.publish(is_public=True, force=True)
mock_get_project_name.assert_called_with(require=True)
mock_get_project_version.assert_called_with(require=True)
@@ -217,6 +217,7 @@ def test_publish_when_not_in_sync_and_force(
mock_open.assert_called_with(unittest.mock.ANY, "rb")
mock_publish.assert_called_with(
handle="sample-tool",
is_public=True,
version="1.0.0",
description="A sample tool",
encoded_file=unittest.mock.ANY,
@@ -258,7 +259,7 @@ def test_publish_success(
mock_publish_response.json.return_value = {"handle": "sample-tool"}
mock_publish.return_value = mock_publish_response
tool_command.publish()
tool_command.publish(is_public=True)
mock_get_project_name.assert_called_with(require=True)
mock_get_project_version.assert_called_with(require=True)
@@ -271,6 +272,7 @@ def test_publish_success(
mock_open.assert_called_with(unittest.mock.ANY, "rb")
mock_publish.assert_called_with(
handle="sample-tool",
is_public=True,
version="1.0.0",
description="A sample tool",
encoded_file=unittest.mock.ANY,
@@ -311,7 +313,7 @@ def test_publish_failure(
mock_publish.return_value = mock_publish_response
with raises(SystemExit):
tool_command.publish()
tool_command.publish(is_public=True)
output = capsys.readouterr().out
assert "Failed to complete operation" in output
assert "Name is already taken" in output
@@ -353,7 +355,7 @@ def test_publish_api_error(
mock_publish.return_value = mock_response
with raises(SystemExit):
tool_command.publish()
tool_command.publish(is_public=True)
output = capsys.readouterr().out
assert "Request to Enterprise API failed" in output

View File

@@ -7,6 +7,7 @@ from crewai.events.listeners.tracing.first_time_trace_handler import (
FirstTimeTraceHandler,
)
from crewai.events.listeners.tracing.trace_batch_manager import (
TraceBatch,
TraceBatchManager,
)
from crewai.events.listeners.tracing.trace_listener import (
@@ -657,6 +658,16 @@ class TestTraceListenerSetup:
trace_listener.first_time_handler.collected_events = True
mock_batch_response = MagicMock()
mock_batch_response.status_code = 201
mock_batch_response.json.return_value = {
"trace_id": "mock-trace-id",
"ephemeral_trace_id": "mock-ephemeral-trace-id",
"access_code": "TRACE-mock",
}
mock_events_response = MagicMock()
mock_events_response.status_code = 200
with (
patch.object(
trace_listener.first_time_handler,
@@ -666,6 +677,40 @@ class TestTraceListenerSetup:
patch.object(
trace_listener.first_time_handler, "_display_ephemeral_trace_link"
) as mock_display_link,
patch.object(
trace_listener.batch_manager.plus_api,
"initialize_trace_batch",
return_value=mock_batch_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_batch_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"send_trace_events",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"send_ephemeral_trace_events",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"finalize_trace_batch",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"finalize_ephemeral_trace_batch",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager,
"_cleanup_batch_data",
),
):
crew.kickoff()
wait_for_event_handlers()
@@ -918,3 +963,576 @@ class TestTraceListenerSetup:
mock_init.assert_called_once()
payload = mock_init.call_args[0][0]
assert "user_identifier" not in payload
class TestTraceBatchIdClearedOnFailure:
"""Tests: trace_batch_id is cleared when _initialize_backend_batch fails."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id (simulating first-time user)."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id # simulate line 96
bm.is_current_batch_ephemeral = True
return bm
def test_trace_batch_id_cleared_on_exception(self):
"""trace_batch_id must be None when the API call raises an exception."""
bm = self._make_batch_manager()
assert bm.trace_batch_id is not None
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=ConnectionError("network down"),
),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
def test_trace_batch_id_set_on_success(self):
"""trace_batch_id must be set from the server response on success."""
bm = self._make_batch_manager()
server_id = "server-ephemeral-trace-id-999"
mock_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_response,
),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
def test_send_events_skipped_when_trace_batch_id_none(self):
"""_send_events_to_backend must return early when trace_batch_id is None."""
bm = self._make_batch_manager()
bm.trace_batch_id = None
bm.event_buffer = [MagicMock()] # has events
with patch.object(
bm.plus_api, "send_ephemeral_trace_events"
) as mock_send:
result = bm._send_events_to_backend()
assert result == 500
mock_send.assert_not_called()
class TestInitializeBackendBatchRetry:
"""Tests for retry logic in _initialize_backend_batch."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
return bm
def test_retries_on_none_response_then_succeeds(self):
"""Retries when API returns None, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-retry"
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[None, success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
mock_sleep.assert_called_once_with(0.2)
def test_retries_on_5xx_then_succeeds(self):
"""Retries on 500 server error, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-5xx"
error_response = MagicMock(status_code=500, text="Internal Server Error")
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[error_response, success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
def test_retries_on_exception_then_succeeds(self):
"""Retries on ConnectionError, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-exception"
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[ConnectionError("network down"), success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
def test_no_retry_on_4xx(self):
"""Does NOT retry on 422 — client error is not transient."""
bm = self._make_batch_manager()
error_response = MagicMock(status_code=422, text="Unprocessable Entity")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=error_response,
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 1
mock_sleep.assert_not_called()
def test_exhausts_retries_then_clears_batch_id(self):
"""After all retries fail, trace_batch_id is None."""
bm = self._make_batch_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=None,
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 3 # initial + 2 retries
class TestFirstTimeHandlerBackendInitGuard:
"""Tests: backend_initialized gated on actual batch creation success."""
def _make_handler_with_manager(self):
"""Create a FirstTimeTraceHandler wired to a TraceBatchManager."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
handler = FirstTimeTraceHandler()
handler.is_first_time = True
handler.collected_events = True
handler.batch_manager = bm
return handler, bm
def test_backend_initialized_true_on_success(self):
"""Events are sent when batch creation succeeds, then state is cleaned up."""
handler, bm = self._make_handler_with_manager()
server_id = "server-id-abc"
mock_init_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
mock_send_response = MagicMock(status_code=200)
trace_batch_id_during_send = None
def capture_send(*args, **kwargs):
nonlocal trace_batch_id_during_send
trace_batch_id_during_send = bm.trace_batch_id
return mock_send_response
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_init_response,
),
patch.object(
bm.plus_api,
"send_ephemeral_trace_events",
side_effect=capture_send,
),
patch.object(bm, "_finalize_backend_batch"),
):
bm.event_buffer = [MagicMock(to_dict=MagicMock(return_value={}))]
handler._initialize_backend_and_send_events()
# trace_batch_id was set correctly during send
assert trace_batch_id_during_send == server_id
# State cleaned up after completion (singleton reuse)
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
assert bm.current_batch is None
def test_backend_initialized_false_on_failure(self):
"""backend_initialized stays False and events are NOT sent when batch creation fails."""
handler, bm = self._make_handler_with_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=None, # server call fails
),
patch.object(bm, "_send_events_to_backend") as mock_send,
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
patch.object(handler, "_gracefully_fail") as mock_fail,
):
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
mock_send.assert_not_called()
mock_finalize.assert_not_called()
mock_fail.assert_called_once()
def test_backend_initialized_false_on_non_2xx(self):
"""backend_initialized stays False when server returns non-2xx."""
handler, bm = self._make_handler_with_manager()
mock_response = MagicMock(status_code=500, text="Internal Server Error")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_response,
),
patch.object(bm, "_send_events_to_backend") as mock_send,
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
patch.object(handler, "_gracefully_fail") as mock_fail,
):
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
mock_send.assert_not_called()
mock_finalize.assert_not_called()
mock_fail.assert_called_once()
class TestFirstTimeHandlerAlwaysEphemeral:
"""Tests that first-time handler always uses ephemeral with skip_context_check."""
def _make_handler_with_manager(self):
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
handler = FirstTimeTraceHandler()
handler.is_first_time = True
handler.collected_events = True
handler.batch_manager = bm
return handler, bm
def test_deferred_init_uses_ephemeral_and_skip_context_check(self):
"""Deferred backend init always uses ephemeral=True and skip_context_check=True."""
handler, bm = self._make_handler_with_manager()
with (
patch.object(bm, "_initialize_backend_batch") as mock_init,
patch.object(bm, "_send_events_to_backend"),
patch.object(bm, "_finalize_backend_batch"),
):
mock_init.side_effect = lambda **kwargs: None
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
mock_init.assert_called_once()
assert mock_init.call_args.kwargs["use_ephemeral"] is True
assert mock_init.call_args.kwargs["skip_context_check"] is True
class TestAuthFailbackToEphemeral:
"""Tests for ephemeral fallback when server rejects auth (401/403)."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = False # authenticated path
return bm
def test_401_non_ephemeral_falls_back_to_ephemeral(self):
"""A 401 on the non-ephemeral endpoint should retry as ephemeral."""
bm = self._make_batch_manager()
server_id = "ephemeral-fallback-id"
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
ephemeral_success = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=auth_rejected,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_success,
) as mock_ephemeral,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id == server_id
assert bm.is_current_batch_ephemeral is True
mock_ephemeral.assert_called_once()
def test_403_non_ephemeral_falls_back_to_ephemeral(self):
"""A 403 on the non-ephemeral endpoint should also fall back."""
bm = self._make_batch_manager()
server_id = "ephemeral-fallback-403"
forbidden = MagicMock(status_code=403, text="Forbidden")
ephemeral_success = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=forbidden,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_success,
),
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id == server_id
assert bm.is_current_batch_ephemeral is True
def test_401_on_ephemeral_does_not_recurse(self):
"""A 401 on the ephemeral endpoint should NOT try to fall back again."""
bm = self._make_batch_manager()
bm.is_current_batch_ephemeral = True
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=auth_rejected,
) as mock_ephemeral,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
# Called only once — no recursive fallback
mock_ephemeral.assert_called()
def test_401_fallback_ephemeral_also_fails(self):
"""If ephemeral fallback also fails, trace_batch_id is cleared."""
bm = self._make_batch_manager()
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
ephemeral_fail = MagicMock(status_code=422, text="Validation failed")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=auth_rejected,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_fail,
),
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id is None

View File

@@ -0,0 +1,70 @@
"""Tests for lock_store.
We verify our own logic: the _redis_available guard and which portalocker
backend is selected. We trust portalocker to handle actual locking mechanics.
"""
from __future__ import annotations
import sys
from unittest import mock
import pytest
import crewai.utilities.lock_store as lock_store
from crewai.utilities.lock_store import lock
@pytest.fixture(autouse=True)
def no_redis_url(monkeypatch):
monkeypatch.setattr(lock_store, "_REDIS_URL", None)
# ---------------------------------------------------------------------------
# _redis_available
# ---------------------------------------------------------------------------
def test_redis_not_available_without_url():
assert lock_store._redis_available() is False
def test_redis_not_available_when_package_missing(monkeypatch):
monkeypatch.setattr(lock_store, "_REDIS_URL", "redis://localhost:6379")
monkeypatch.setitem(sys.modules, "redis", None) # None → ImportError on import
assert lock_store._redis_available() is False
def test_redis_available_with_url_and_package(monkeypatch):
monkeypatch.setattr(lock_store, "_REDIS_URL", "redis://localhost:6379")
monkeypatch.setitem(sys.modules, "redis", mock.MagicMock())
assert lock_store._redis_available() is True
# ---------------------------------------------------------------------------
# lock strategy selection
# ---------------------------------------------------------------------------
def test_uses_file_lock_when_redis_unavailable():
with mock.patch("portalocker.Lock") as mock_lock:
with lock("file_test"):
pass
mock_lock.assert_called_once()
assert "crewai:" in mock_lock.call_args.args[0]
def test_uses_redis_lock_when_redis_available(monkeypatch):
fake_conn = mock.MagicMock()
monkeypatch.setattr(lock_store, "_redis_available", mock.Mock(return_value=True))
monkeypatch.setattr(lock_store, "_redis_connection", mock.Mock(return_value=fake_conn))
with mock.patch("portalocker.RedisLock") as mock_redis_lock:
with lock("redis_test"):
pass
mock_redis_lock.assert_called_once()
kwargs = mock_redis_lock.call_args.kwargs
assert kwargs["channel"].startswith("crewai:")
assert kwargs["connection"] is fake_conn

6
uv.lock generated
View File

@@ -5556,11 +5556,11 @@ wheels = [
[[package]]
name = "pyasn1"
version = "0.6.2"
version = "0.6.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/fe/b6/6e630dff89739fcd427e3f72b3d905ce0acb85a45d4ec3e2678718a3487f/pyasn1-0.6.2.tar.gz", hash = "sha256:9b59a2b25ba7e4f8197db7686c09fb33e658b98339fadb826e9512629017833b", size = 146586, upload-time = "2026-01-16T18:04:18.534Z" }
sdist = { url = "https://files.pythonhosted.org/packages/5c/5f/6583902b6f79b399c9c40674ac384fd9cd77805f9e6205075f828ef11fb2/pyasn1-0.6.3.tar.gz", hash = "sha256:697a8ecd6d98891189184ca1fa05d1bb00e2f84b5977c481452050549c8a72cf", size = 148685, upload-time = "2026-03-17T01:06:53.382Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/44/b5/a96872e5184f354da9c84ae119971a0a4c221fe9b27a4d94bd43f2596727/pyasn1-0.6.2-py3-none-any.whl", hash = "sha256:1eb26d860996a18e9b6ed05e7aae0e9fc21619fcee6af91cca9bad4fbea224bf", size = 83371, upload-time = "2026-01-16T18:04:17.174Z" },
{ url = "https://files.pythonhosted.org/packages/5d/a0/7d793dce3fa811fe047d6ae2431c672364b462850c6235ae306c0efd025f/pyasn1-0.6.3-py3-none-any.whl", hash = "sha256:a80184d120f0864a52a073acc6fc642847d0be408e7c7252f31390c0f4eadcde", size = 83997, upload-time = "2026-03-17T01:06:52.036Z" },
]
[[package]]