Compare commits

..

11 Commits

Author SHA1 Message Date
Tiago Freire
81a8c2efc1 fix: prevent zeroed data in local trace message and ensure cleanup on all paths
- Read execution start time non-destructively before _finalize_backend_batch
    consumes it, so the server receives the real duration and the local
    fallback message also shows the correct value
  - Pass pre-captured events_count, duration_ms, and batch_id to
    _show_local_trace_message instead of reading from batch_manager
    (buffer cleared by send, duration consumed by finalize)
  - Extract _reset_batch_state to reset all singleton state (current_batch,
    event_buffer, trace_batch_id, is_current_batch_ephemeral,
    backend_initialized, batch_owner_type/id) and call it in every exit
    path: success, init failure, send failure, and exception handler
2026-03-18 22:50:48 -03:00
Tiago Freire
4859dc66a5 fix: address PR review findings — forward skip_context_check in
recursive fallback, reduce retry backoff, clean up batch state

  - Forward skip_context_check parameter in the 401/403 ephemeral
    fallback recursive call to prevent silent early return when
    is_tracing_enabled_in_context() is False
  - Reduce retry backoff from 1s to 200ms to minimize lock hold time
    on the non-first-time path (worst case 400ms vs 2s)
  - Add batch state cleanup after _finalize_backend_batch in the
    first-time handler, mirroring finalize_batch: reset current_batch,
    event_buffer, trace_batch_id, is_current_batch_ephemeral,
    batch_owner_type, batch_owner_id, and call _cleanup_batch_data()
2026-03-18 21:29:35 -03:00
Tiago Freire
7f2eda2ac6 refactor: remove redundant tests in TestTraceBatchIdClearedOnFailure
Remove test_trace_batch_id_cleared_on_none_response (covered by
  TestInitializeBackendBatchRetry::test_exhausts_retries_then_clears_batch_id)
  and test_trace_batch_id_cleared_on_non_2xx_response (covered by
  TestInitializeBackendBatchRetry::test_no_retry_on_4xx).
2026-03-18 20:46:20 -03:00
Tiago Freire
6734db4f71 fix: mark batch as failed when event send fails in first-time handler
The return value of _send_events_to_backend() was discarded in
  _initialize_backend_and_send_events, so _finalize_backend_batch was
  called unconditionally with the full event count even when the send
  returned 500. This finalized the batch as "completed" on the server
  while it received 0 events, producing an empty trace URL.

  Now check the return status and call mark_trace_batch_as_failed on
  500, matching the behavior of the regular finalize_batch path.
2026-03-18 20:35:57 -03:00
Tiago Freire
c00544c97d fix: always use ephemeral tracing for first-time users
The first-time handler UX is built around ephemeral traces (access
  code, 24hr expiry link, browser open). Checking auth and creating
  non-ephemeral batches caused the handler to fall through to the
  local traces fallback since ephemeral_trace_url is only set for
  ephemeral batches. The server's LinkEphemeralTracesJob links
  ephemeral traces to user accounts on signup regardless.

  Remove auth check from first-time handler and always pass
  use_ephemeral=True to _initialize_backend_batch.
2026-03-18 19:54:10 -03:00
Tiago Freire
08c533905e fix: bypass is_tracing_enabled_in_context for first-time deferred batch init
First-time users have is_tracing_enabled_in_context() = False by design
  (it's a prerequisite for should_auto_collect_first_time_traces). This
  caused _initialize_backend_batch to return early without creating the
  batch, and _send_events_to_backend to send to a non-existent batch.

  Add skip_context_check parameter to _initialize_backend_batch so the
  first-time handler can bypass the guard during deferred init. Gate
  backend_initialized on trace_batch_id being set. Call
  _finalize_backend_batch directly instead of finalize_batch (which has
  the same context guard). Sync is_current_batch_ephemeral on success
  to prevent endpoint mismatch between batch creation and event send.
2026-03-18 19:54:10 -03:00
Tiago Freire
6d1546c381 feat: fall back to ephemeral tracing on server auth rejection
When the non-ephemeral batch endpoint returns 401 or 403 (expired
  token, revoked credentials, JWKS rotation), _initialize_backend_batch
  now switches is_current_batch_ephemeral to True and retries via the
  ephemeral endpoint. This preserves traces that would otherwise be
  lost due to the timing gap between client-side token validation and
  server-side JWT decode.

  The fallback only triggers on the non-ephemeral path to prevent
  infinite recursion. If the ephemeral attempt also fails, trace_batch_id
  is cleared normally.

  Addresses the 2M+ failed push attempts in which valid client-side
  tokens were rejected on the server.
2026-03-18 19:54:10 -03:00
Tiago Freire
bcba620a41 fix: respect authentication status for first-time users
Previously, _initialize_batch forced use_ephemeral=True for all
  first-time users, bypassing _check_authenticated() entirely. This
  meant logged-in users in a new project directory were routed to the
  ephemeral endpoint instead of their account's tracing endpoint.

  Now _check_authenticated() runs for all users including first-time.
  Authenticated first-time users get non-ephemeral tracing (traces
  linked to their account); only unauthenticated first-time users
  fall back to ephemeral. The deferred backend init in
  FirstTimeTraceHandler also reads is_current_batch_ephemeral instead
  of hardcoding use_ephemeral=True.
2026-03-18 19:54:10 -03:00
Tiago Freire
4141233a78 feat: add retry logic for ephemeral trace batch creation
Transient failures (None response, 5xx, network errors) during
  _initialize_backend_batch now retry up to 2 times with a 1s backoff.
  Non-transient 4xx errors (422 validation, 401 auth) are not retried
  since the same payload would fail again. If all retries are exhausted,
  trace_batch_id is cleared per the existing safety net.

  This runs post-execution when the user has already answered "y" to
  view traces, so the ~2s worst-case delay is acceptable.
2026-03-18 19:54:10 -03:00
Tiago Freire
c67425d323 fix: gate backend_initialized on actual batch creation success
In first_time_trace_handler._initialize_backend_and_send_events,
  backend_initialized was set to True unconditionally after calling
  _initialize_backend_batch, regardless of whether the server-side
  batch was actually created. This caused _send_events_to_backend and
  finalize_batch to run against a non-existent batch.

  Now check trace_batch_id after _initialize_backend_batch returns;
  if None (batch creation failed), call _gracefully_fail and return
  early, skipping event send and finalization.
2026-03-18 19:54:10 -03:00
Tiago Freire
7f090c664e fix: clear trace_batch_id on backend batch initialization failure
When _initialize_backend_batch fails (None response, non-2xx status,
  or exception), trace_batch_id was left populated with a client-generated
  UUID that was never registered server-side. Subsequent calls to
  _send_events_to_backend would see the stale ID and POST events to
  /ephemeral/batches/{id}/events, resulting in a 404 from the server.

  Nullify trace_batch_id on all three failure paths so downstream methods
  skip event sending instead of hitting a non-existent batch.
2026-03-18 19:54:10 -03:00
12 changed files with 721 additions and 175 deletions

View File

@@ -75,7 +75,6 @@ from crewai.utilities.agent_utils import (
)
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.env import get_env_context
from crewai.utilities.guardrail import process_guardrail
from crewai.utilities.guardrail_types import GuardrailType
from crewai.utilities.llm_utils import create_llm
@@ -365,7 +364,6 @@ class Agent(BaseAgent):
ValueError: If the max execution time is not a positive integer.
RuntimeError: If the agent execution fails for other reasons.
"""
get_env_context()
# Only call handle_reasoning for legacy CrewAgentExecutor
# For AgentExecutor, planning is handled in AgentExecutor.generate_plan()
if self.executor_class is not AgentExecutor:

View File

@@ -98,7 +98,6 @@ from crewai.types.streaming import CrewStreamingOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
from crewai.utilities.crew.models import CrewContext
from crewai.utilities.env import get_env_context
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.file_handler import FileHandler
@@ -680,7 +679,6 @@ class Crew(FlowTrackable, BaseModel):
Returns:
CrewOutput or CrewStreamingOutput if streaming is enabled.
"""
get_env_context()
if self.stream:
enable_agent_streaming(self.agents)
ctx = StreamingContext()

View File

@@ -34,12 +34,6 @@ from crewai.events.types.crew_events import (
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from crewai.events.types.env_events import (
CCEnvEvent,
CodexEnvEvent,
CursorEnvEvent,
DefaultEnvEvent,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
@@ -149,23 +143,6 @@ class EventListener(BaseEventListener):
# ----------- CREW EVENTS -----------
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None:
@crewai_event_bus.on(CCEnvEvent)
def on_cc_env(_: Any, event: CCEnvEvent) -> None:
self._telemetry.env_context_span(event.type)
@crewai_event_bus.on(CodexEnvEvent)
def on_codex_env(_: Any, event: CodexEnvEvent) -> None:
self._telemetry.env_context_span(event.type)
@crewai_event_bus.on(CursorEnvEvent)
def on_cursor_env(_: Any, event: CursorEnvEvent) -> None:
self._telemetry.env_context_span(event.type)
@crewai_event_bus.on(DefaultEnvEvent)
def on_default_env(_: Any, event: DefaultEnvEvent) -> None:
self._telemetry.env_context_span(event.type)
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None:
self.formatter.handle_crew_started(event.crew_name or "Crew", source.id)

View File

@@ -1,6 +1,7 @@
import logging
import uuid
import webbrowser
from datetime import datetime, timezone
from rich.console import Console
from rich.panel import Panel
@@ -100,20 +101,48 @@ class FirstTimeTraceHandler:
user_context=user_context,
execution_metadata=execution_metadata,
use_ephemeral=True,
skip_context_check=True,
)
if not self.batch_manager.trace_batch_id:
self._gracefully_fail("Backend batch creation failed, cannot send events.")
self._reset_batch_state()
return
self.batch_manager.backend_initialized = True
if self.batch_manager.event_buffer:
self.batch_manager._send_events_to_backend()
# Capture values before send/finalize consume them
events_count = len(self.batch_manager.event_buffer)
batch_id = self.batch_manager.trace_batch_id
# Read duration non-destructively — _finalize_backend_batch will consume it
start_time = self.batch_manager.execution_start_times.get("execution")
duration_ms = (
int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
if start_time
else 0
)
self.batch_manager.finalize_batch()
if self.batch_manager.event_buffer:
send_status = self.batch_manager._send_events_to_backend()
if send_status == 500 and self.batch_manager.trace_batch_id:
self.batch_manager.plus_api.mark_trace_batch_as_failed(
self.batch_manager.trace_batch_id,
"Error sending events to backend",
)
self._reset_batch_state()
return
self.batch_manager._finalize_backend_batch(events_count)
self.ephemeral_url = self.batch_manager.ephemeral_trace_url
if not self.ephemeral_url:
self._show_local_trace_message()
self._show_local_trace_message(events_count, duration_ms, batch_id)
self._reset_batch_state()
except Exception as e:
self._gracefully_fail(f"Backend initialization failed: {e}")
self._reset_batch_state()
def _display_ephemeral_trace_link(self):
"""Display the ephemeral trace link to the user and automatically open browser."""
@@ -184,6 +213,19 @@ To enable tracing later, do any one of these:
console.print(panel)
console.print()
def _reset_batch_state(self):
"""Reset batch manager state to allow future executions to re-initialize."""
if not self.batch_manager:
return
self.batch_manager.batch_owner_type = None
self.batch_manager.batch_owner_id = None
self.batch_manager.current_batch = None
self.batch_manager.event_buffer.clear()
self.batch_manager.trace_batch_id = None
self.batch_manager.is_current_batch_ephemeral = False
self.batch_manager.backend_initialized = False
self.batch_manager._cleanup_batch_data()
def _gracefully_fail(self, error_message: str):
"""Handle errors gracefully without disrupting user experience."""
console = Console()
@@ -191,7 +233,7 @@ To enable tracing later, do any one of these:
logger.debug(f"First-time trace error: {error_message}")
def _show_local_trace_message(self):
def _show_local_trace_message(self, events_count: int = 0, duration_ms: int = 0, batch_id: str | None = None):
"""Show message when traces were collected locally but couldn't be uploaded."""
console = Console()
@@ -199,9 +241,9 @@ To enable tracing later, do any one of these:
📊 Your execution traces were collected locally!
Unfortunately, we couldn't upload them to the server right now, but here's what we captured:
{len(self.batch_manager.event_buffer)} trace events
• Execution duration: {self.batch_manager.calculate_duration("execution")}ms
• Batch ID: {self.batch_manager.trace_batch_id}
{events_count} trace events
• Execution duration: {duration_ms}ms
• Batch ID: {batch_id}
✅ Tracing has been enabled for future runs!
Your preference has been saved. Future Crew/Flow executions will automatically collect traces.

View File

@@ -1,3 +1,4 @@
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from logging import getLogger
@@ -108,10 +109,11 @@ class TraceBatchManager:
user_context: dict[str, str],
execution_metadata: dict[str, Any],
use_ephemeral: bool = False,
skip_context_check: bool = False,
) -> None:
"""Send batch initialization to backend"""
if not is_tracing_enabled_in_context():
if not skip_context_check and not is_tracing_enabled_in_context():
return
if not self.plus_api or not self.current_batch:
@@ -142,19 +144,62 @@ class TraceBatchManager:
payload["ephemeral_trace_id"] = self.current_batch.batch_id
payload["user_identifier"] = get_user_id()
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
max_retries = 2
response = None
last_exception = None
for attempt in range(max_retries + 1):
try:
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
if response is not None and response.status_code < 500:
break
if attempt < max_retries:
logger.debug(
f"Trace batch init attempt {attempt + 1} failed "
f"(status={response.status_code if response else 'None'}), retrying..."
)
time.sleep(0.2)
except Exception as e:
last_exception = e
if attempt < max_retries:
logger.debug(
f"Trace batch init attempt {attempt + 1} raised {type(e).__name__}, retrying..."
)
time.sleep(0.2)
if last_exception and response is None:
logger.warning(
f"Error initializing trace batch: {last_exception}. Continuing without tracing."
)
self.trace_batch_id = None
return
if response is None:
logger.warning(
"Trace batch initialization failed gracefully. Continuing without tracing."
)
self.trace_batch_id = None
return
# Fall back to ephemeral on auth failure (expired/revoked token)
if response.status_code in [401, 403] and not use_ephemeral:
logger.warning(
"Auth rejected by server, falling back to ephemeral tracing."
)
self.is_current_batch_ephemeral = True
return self._initialize_backend_batch(
user_context,
execution_metadata,
use_ephemeral=True,
skip_context_check=skip_context_check,
)
if response.status_code in [201, 200]:
self.is_current_batch_ephemeral = use_ephemeral
response_data = response.json()
self.trace_batch_id = (
response_data["trace_id"]
@@ -165,11 +210,13 @@ class TraceBatchManager:
logger.warning(
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
)
self.trace_batch_id = None
except Exception as e:
logger.warning(
f"Error initializing trace batch: {e}. Continuing without tracing."
)
self.trace_batch_id = None
def begin_event_processing(self) -> None:
"""Mark that an event handler started processing (for synchronization)."""

View File

@@ -58,12 +58,6 @@ from crewai.events.types.crew_events import (
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
)
from crewai.events.types.env_events import (
CCEnvEvent,
CodexEnvEvent,
CursorEnvEvent,
DefaultEnvEvent,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
@@ -198,7 +192,6 @@ class TraceCollectionListener(BaseEventListener):
if self._listeners_setup:
return
self._register_env_event_handlers(crewai_event_bus)
self._register_flow_event_handlers(crewai_event_bus)
self._register_context_event_handlers(crewai_event_bus)
self._register_action_event_handlers(crewai_event_bus)
@@ -207,25 +200,6 @@ class TraceCollectionListener(BaseEventListener):
self._listeners_setup = True
def _register_env_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for environment context events."""
@event_bus.on(CCEnvEvent)
def on_cc_env(source: Any, event: CCEnvEvent) -> None:
self._handle_action_event("cc_env", source, event)
@event_bus.on(CodexEnvEvent)
def on_codex_env(source: Any, event: CodexEnvEvent) -> None:
self._handle_action_event("codex_env", source, event)
@event_bus.on(CursorEnvEvent)
def on_cursor_env(source: Any, event: CursorEnvEvent) -> None:
self._handle_action_event("cursor_env", source, event)
@event_bus.on(DefaultEnvEvent)
def on_default_env(source: Any, event: DefaultEnvEvent) -> None:
self._handle_action_event("default_env", source, event)
def _register_flow_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for flow events."""

View File

@@ -1,36 +0,0 @@
from typing import Annotated, Literal
from pydantic import Field, TypeAdapter
from crewai.events.base_events import BaseEvent
class CCEnvEvent(BaseEvent):
type: Literal["cc_env"] = "cc_env"
class CodexEnvEvent(BaseEvent):
type: Literal["codex_env"] = "codex_env"
class CursorEnvEvent(BaseEvent):
type: Literal["cursor_env"] = "cursor_env"
class DefaultEnvEvent(BaseEvent):
type: Literal["default_env"] = "default_env"
EnvContextEvent = Annotated[
CCEnvEvent | CodexEnvEvent | CursorEnvEvent | DefaultEnvEvent,
Field(discriminator="type"),
]
env_context_event_adapter: TypeAdapter[EnvContextEvent] = TypeAdapter(EnvContextEvent)
ENV_CONTEXT_EVENT_TYPES: tuple[type[BaseEvent], ...] = (
CCEnvEvent,
CodexEnvEvent,
CursorEnvEvent,
DefaultEnvEvent,
)

View File

@@ -110,7 +110,6 @@ if TYPE_CHECKING:
from crewai.flow.visualization import build_flow_structure, render_interactive
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.utilities.env import get_env_context
from crewai.utilities.streaming import (
TaskInfo,
create_async_chunk_generator,
@@ -1771,7 +1770,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns:
The final output from the flow or FlowStreamingOutput if streaming.
"""
get_env_context()
if self.stream:
result_holder: list[Any] = []
current_task_info: TaskInfo = {

View File

@@ -986,22 +986,6 @@ class Telemetry:
self._safe_telemetry_operation(_operation)
def env_context_span(self, tool: str) -> None:
"""Records the coding tool environment context."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Environment Context")
self._add_attribute(
span,
"crewai_version",
version("crewai"),
)
self._add_attribute(span, "tool", tool)
close_span(span)
self._safe_telemetry_operation(_operation)
def human_feedback_span(
self,
event_type: str,

View File

@@ -8,21 +8,6 @@ TRAINED_AGENTS_DATA_FILE: Final[str] = "trained_agents_data.pkl"
KNOWLEDGE_DIRECTORY: Final[str] = "knowledge"
MAX_FILE_NAME_LENGTH: Final[int] = 255
EMITTER_COLOR: Final[PrinterColor] = "bold_blue"
CC_ENV_VAR: Final[str] = "CLAUDECODE"
CODEX_ENV_VARS: Final[tuple[str, ...]] = (
"CODEX_CI",
"CODEX_MANAGED_BY_NPM",
"CODEX_SANDBOX",
"CODEX_SANDBOX_NETWORK_DISABLED",
"CODEX_THREAD_ID",
)
CURSOR_ENV_VARS: Final[tuple[str, ...]] = (
"CURSOR_AGENT",
"CURSOR_EXTENSION_HOST_ROLE",
"CURSOR_SANDBOX",
"CURSOR_TRACE_ID",
"CURSOR_WORKSPACE_LABEL",
)
class _NotSpecified:

View File

@@ -1,39 +0,0 @@
import contextvars
import os
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.env_events import (
CCEnvEvent,
CodexEnvEvent,
CursorEnvEvent,
DefaultEnvEvent,
)
from crewai.utilities.constants import CC_ENV_VAR, CODEX_ENV_VARS, CURSOR_ENV_VARS
_env_context_emitted: contextvars.ContextVar[bool] = contextvars.ContextVar(
"_env_context_emitted", default=False
)
def _is_codex_env() -> bool:
return any(os.environ.get(var) for var in CODEX_ENV_VARS)
def _is_cursor_env() -> bool:
return any(os.environ.get(var) for var in CURSOR_ENV_VARS)
def get_env_context() -> None:
if _env_context_emitted.get():
return
_env_context_emitted.set(True)
if os.environ.get(CC_ENV_VAR):
crewai_event_bus.emit(None, CCEnvEvent())
elif _is_codex_env():
crewai_event_bus.emit(None, CodexEnvEvent())
elif _is_cursor_env():
crewai_event_bus.emit(None, CursorEnvEvent())
else:
crewai_event_bus.emit(None, DefaultEnvEvent())

View File

@@ -7,6 +7,7 @@ from crewai.events.listeners.tracing.first_time_trace_handler import (
FirstTimeTraceHandler,
)
from crewai.events.listeners.tracing.trace_batch_manager import (
TraceBatch,
TraceBatchManager,
)
from crewai.events.listeners.tracing.trace_listener import (
@@ -657,6 +658,16 @@ class TestTraceListenerSetup:
trace_listener.first_time_handler.collected_events = True
mock_batch_response = MagicMock()
mock_batch_response.status_code = 201
mock_batch_response.json.return_value = {
"trace_id": "mock-trace-id",
"ephemeral_trace_id": "mock-ephemeral-trace-id",
"access_code": "TRACE-mock",
}
mock_events_response = MagicMock()
mock_events_response.status_code = 200
with (
patch.object(
trace_listener.first_time_handler,
@@ -666,6 +677,40 @@ class TestTraceListenerSetup:
patch.object(
trace_listener.first_time_handler, "_display_ephemeral_trace_link"
) as mock_display_link,
patch.object(
trace_listener.batch_manager.plus_api,
"initialize_trace_batch",
return_value=mock_batch_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_batch_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"send_trace_events",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"send_ephemeral_trace_events",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"finalize_trace_batch",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"finalize_ephemeral_trace_batch",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager,
"_cleanup_batch_data",
),
):
crew.kickoff()
wait_for_event_handlers()
@@ -918,3 +963,576 @@ class TestTraceListenerSetup:
mock_init.assert_called_once()
payload = mock_init.call_args[0][0]
assert "user_identifier" not in payload
class TestTraceBatchIdClearedOnFailure:
"""Tests: trace_batch_id is cleared when _initialize_backend_batch fails."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id (simulating first-time user)."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id # simulate line 96
bm.is_current_batch_ephemeral = True
return bm
def test_trace_batch_id_cleared_on_exception(self):
"""trace_batch_id must be None when the API call raises an exception."""
bm = self._make_batch_manager()
assert bm.trace_batch_id is not None
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=ConnectionError("network down"),
),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
def test_trace_batch_id_set_on_success(self):
"""trace_batch_id must be set from the server response on success."""
bm = self._make_batch_manager()
server_id = "server-ephemeral-trace-id-999"
mock_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_response,
),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
def test_send_events_skipped_when_trace_batch_id_none(self):
"""_send_events_to_backend must return early when trace_batch_id is None."""
bm = self._make_batch_manager()
bm.trace_batch_id = None
bm.event_buffer = [MagicMock()] # has events
with patch.object(
bm.plus_api, "send_ephemeral_trace_events"
) as mock_send:
result = bm._send_events_to_backend()
assert result == 500
mock_send.assert_not_called()
class TestInitializeBackendBatchRetry:
"""Tests for retry logic in _initialize_backend_batch."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
return bm
def test_retries_on_none_response_then_succeeds(self):
"""Retries when API returns None, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-retry"
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[None, success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
mock_sleep.assert_called_once_with(0.2)
def test_retries_on_5xx_then_succeeds(self):
"""Retries on 500 server error, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-5xx"
error_response = MagicMock(status_code=500, text="Internal Server Error")
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[error_response, success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
def test_retries_on_exception_then_succeeds(self):
"""Retries on ConnectionError, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-exception"
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[ConnectionError("network down"), success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
def test_no_retry_on_4xx(self):
"""Does NOT retry on 422 — client error is not transient."""
bm = self._make_batch_manager()
error_response = MagicMock(status_code=422, text="Unprocessable Entity")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=error_response,
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 1
mock_sleep.assert_not_called()
def test_exhausts_retries_then_clears_batch_id(self):
"""After all retries fail, trace_batch_id is None."""
bm = self._make_batch_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=None,
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 3 # initial + 2 retries
class TestFirstTimeHandlerBackendInitGuard:
"""Tests: backend_initialized gated on actual batch creation success."""
def _make_handler_with_manager(self):
"""Create a FirstTimeTraceHandler wired to a TraceBatchManager."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
handler = FirstTimeTraceHandler()
handler.is_first_time = True
handler.collected_events = True
handler.batch_manager = bm
return handler, bm
def test_backend_initialized_true_on_success(self):
"""Events are sent when batch creation succeeds, then state is cleaned up."""
handler, bm = self._make_handler_with_manager()
server_id = "server-id-abc"
mock_init_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
mock_send_response = MagicMock(status_code=200)
trace_batch_id_during_send = None
def capture_send(*args, **kwargs):
nonlocal trace_batch_id_during_send
trace_batch_id_during_send = bm.trace_batch_id
return mock_send_response
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_init_response,
),
patch.object(
bm.plus_api,
"send_ephemeral_trace_events",
side_effect=capture_send,
),
patch.object(bm, "_finalize_backend_batch"),
):
bm.event_buffer = [MagicMock(to_dict=MagicMock(return_value={}))]
handler._initialize_backend_and_send_events()
# trace_batch_id was set correctly during send
assert trace_batch_id_during_send == server_id
# State cleaned up after completion (singleton reuse)
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
assert bm.current_batch is None
def test_backend_initialized_false_on_failure(self):
"""backend_initialized stays False and events are NOT sent when batch creation fails."""
handler, bm = self._make_handler_with_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=None, # server call fails
),
patch.object(bm, "_send_events_to_backend") as mock_send,
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
patch.object(handler, "_gracefully_fail") as mock_fail,
):
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
mock_send.assert_not_called()
mock_finalize.assert_not_called()
mock_fail.assert_called_once()
def test_backend_initialized_false_on_non_2xx(self):
"""backend_initialized stays False when server returns non-2xx."""
handler, bm = self._make_handler_with_manager()
mock_response = MagicMock(status_code=500, text="Internal Server Error")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_response,
),
patch.object(bm, "_send_events_to_backend") as mock_send,
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
patch.object(handler, "_gracefully_fail") as mock_fail,
):
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
mock_send.assert_not_called()
mock_finalize.assert_not_called()
mock_fail.assert_called_once()
class TestFirstTimeHandlerAlwaysEphemeral:
"""Tests that first-time handler always uses ephemeral with skip_context_check."""
def _make_handler_with_manager(self):
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
handler = FirstTimeTraceHandler()
handler.is_first_time = True
handler.collected_events = True
handler.batch_manager = bm
return handler, bm
def test_deferred_init_uses_ephemeral_and_skip_context_check(self):
"""Deferred backend init always uses ephemeral=True and skip_context_check=True."""
handler, bm = self._make_handler_with_manager()
with (
patch.object(bm, "_initialize_backend_batch") as mock_init,
patch.object(bm, "_send_events_to_backend"),
patch.object(bm, "_finalize_backend_batch"),
):
mock_init.side_effect = lambda **kwargs: None
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
mock_init.assert_called_once()
assert mock_init.call_args.kwargs["use_ephemeral"] is True
assert mock_init.call_args.kwargs["skip_context_check"] is True
class TestAuthFailbackToEphemeral:
"""Tests for ephemeral fallback when server rejects auth (401/403)."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = False # authenticated path
return bm
def test_401_non_ephemeral_falls_back_to_ephemeral(self):
"""A 401 on the non-ephemeral endpoint should retry as ephemeral."""
bm = self._make_batch_manager()
server_id = "ephemeral-fallback-id"
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
ephemeral_success = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=auth_rejected,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_success,
) as mock_ephemeral,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id == server_id
assert bm.is_current_batch_ephemeral is True
mock_ephemeral.assert_called_once()
def test_403_non_ephemeral_falls_back_to_ephemeral(self):
"""A 403 on the non-ephemeral endpoint should also fall back."""
bm = self._make_batch_manager()
server_id = "ephemeral-fallback-403"
forbidden = MagicMock(status_code=403, text="Forbidden")
ephemeral_success = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=forbidden,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_success,
),
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id == server_id
assert bm.is_current_batch_ephemeral is True
def test_401_on_ephemeral_does_not_recurse(self):
"""A 401 on the ephemeral endpoint should NOT try to fall back again."""
bm = self._make_batch_manager()
bm.is_current_batch_ephemeral = True
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=auth_rejected,
) as mock_ephemeral,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
# Called only once — no recursive fallback
mock_ephemeral.assert_called()
def test_401_fallback_ephemeral_also_fails(self):
"""If ephemeral fallback also fails, trace_batch_id is cleared."""
bm = self._make_batch_manager()
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
ephemeral_fail = MagicMock(status_code=422, text="Validation failed")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=auth_rejected,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_fail,
),
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id is None