diff --git a/lib/crewai/src/crewai/cli/plus_api.py b/lib/crewai/src/crewai/cli/plus_api.py index e32e5220d..665221f1e 100644 --- a/lib/crewai/src/crewai/cli/plus_api.py +++ b/lib/crewai/src/crewai/cli/plus_api.py @@ -196,6 +196,16 @@ class PlusAPI: timeout=30, ) + def mark_ephemeral_trace_batch_as_failed( + self, trace_batch_id: str, error_message: str + ) -> httpx.Response: + return self._make_request( + "PATCH", + f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}", + json={"status": "failed", "failure_reason": error_message}, + timeout=30, + ) + def get_mcp_configs(self, slugs: list[str]) -> httpx.Response: """Get MCP server configurations for the given slugs.""" return self._make_request( diff --git a/lib/crewai/src/crewai/events/listeners/tracing/first_time_trace_handler.py b/lib/crewai/src/crewai/events/listeners/tracing/first_time_trace_handler.py index 715642a6e..436d50c27 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/first_time_trace_handler.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/first_time_trace_handler.py @@ -1,3 +1,4 @@ +from datetime import datetime, timezone import logging import uuid import webbrowser @@ -100,20 +101,50 @@ class FirstTimeTraceHandler: user_context=user_context, execution_metadata=execution_metadata, use_ephemeral=True, + skip_context_check=True, ) + + if not self.batch_manager.trace_batch_id: + self._gracefully_fail( + "Backend batch creation failed, cannot send events." + ) + self._reset_batch_state() + return + self.batch_manager.backend_initialized = True - if self.batch_manager.event_buffer: - self.batch_manager._send_events_to_backend() + # Capture values before send/finalize consume them + events_count = len(self.batch_manager.event_buffer) + batch_id = self.batch_manager.trace_batch_id + # Read duration non-destructively — _finalize_backend_batch will consume it + start_time = self.batch_manager.execution_start_times.get("execution") + duration_ms = ( + int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) + if start_time + else 0 + ) - self.batch_manager.finalize_batch() + if self.batch_manager.event_buffer: + send_status = self.batch_manager._send_events_to_backend() + if send_status == 500 and self.batch_manager.trace_batch_id: + self.batch_manager._mark_batch_as_failed( + self.batch_manager.trace_batch_id, + "Error sending events to backend", + ) + self._reset_batch_state() + return + + self.batch_manager._finalize_backend_batch(events_count) self.ephemeral_url = self.batch_manager.ephemeral_trace_url if not self.ephemeral_url: - self._show_local_trace_message() + self._show_local_trace_message(events_count, duration_ms, batch_id) + + self._reset_batch_state() except Exception as e: self._gracefully_fail(f"Backend initialization failed: {e}") + self._reset_batch_state() def _display_ephemeral_trace_link(self) -> None: """Display the ephemeral trace link to the user and automatically open browser.""" @@ -185,6 +216,19 @@ To enable tracing later, do any one of these: console.print(panel) console.print() + def _reset_batch_state(self) -> None: + """Reset batch manager state to allow future executions to re-initialize.""" + if not self.batch_manager: + return + self.batch_manager.batch_owner_type = None + self.batch_manager.batch_owner_id = None + self.batch_manager.current_batch = None + self.batch_manager.event_buffer.clear() + self.batch_manager.trace_batch_id = None + self.batch_manager.is_current_batch_ephemeral = False + self.batch_manager.backend_initialized = False + self.batch_manager._cleanup_batch_data() + def _gracefully_fail(self, error_message: str) -> None: """Handle errors gracefully without disrupting user experience.""" console = Console() @@ -192,7 +236,9 @@ To enable tracing later, do any one of these: logger.debug(f"First-time trace error: {error_message}") - def _show_local_trace_message(self) -> None: + def _show_local_trace_message( + self, events_count: int = 0, duration_ms: int = 0, batch_id: str | None = None + ) -> None: """Show message when traces were collected locally but couldn't be uploaded.""" if self.batch_manager is None: return @@ -203,9 +249,9 @@ To enable tracing later, do any one of these: 📊 Your execution traces were collected locally! Unfortunately, we couldn't upload them to the server right now, but here's what we captured: -• {len(self.batch_manager.event_buffer)} trace events -• Execution duration: {self.batch_manager.calculate_duration("execution")}ms -• Batch ID: {self.batch_manager.trace_batch_id} +• {events_count} trace events +• Execution duration: {duration_ms}ms +• Batch ID: {batch_id} ✅ Tracing has been enabled for future runs! Your preference has been saved. Future Crew/Flow executions will automatically collect traces. diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py index da25792fb..1a25b68a9 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py @@ -2,6 +2,7 @@ from dataclasses import dataclass, field from datetime import datetime, timezone from logging import getLogger from threading import Condition, Lock +import time from typing import Any import uuid @@ -98,7 +99,7 @@ class TraceBatchManager: self._initialize_backend_batch( user_context, execution_metadata, use_ephemeral ) - self.backend_initialized = True + self.backend_initialized = self.trace_batch_id is not None self._batch_ready_cv.notify_all() return self.current_batch @@ -108,14 +109,15 @@ class TraceBatchManager: user_context: dict[str, str], execution_metadata: dict[str, Any], use_ephemeral: bool = False, + skip_context_check: bool = False, ) -> None: """Send batch initialization to backend""" - if not is_tracing_enabled_in_context(): - return + if not skip_context_check and not is_tracing_enabled_in_context(): + return None if not self.plus_api or not self.current_batch: - return + return None try: payload = { @@ -142,19 +144,53 @@ class TraceBatchManager: payload["ephemeral_trace_id"] = self.current_batch.batch_id payload["user_identifier"] = get_user_id() - response = ( - self.plus_api.initialize_ephemeral_trace_batch(payload) - if use_ephemeral - else self.plus_api.initialize_trace_batch(payload) - ) + max_retries = 1 + response = None + + try: + for attempt in range(max_retries + 1): + response = ( + self.plus_api.initialize_ephemeral_trace_batch(payload) + if use_ephemeral + else self.plus_api.initialize_trace_batch(payload) + ) + if response is not None and response.status_code < 500: + break + if attempt < max_retries: + logger.debug( + f"Trace batch init attempt {attempt + 1} failed " + f"(status={response.status_code if response else 'None'}), retrying..." + ) + time.sleep(0.2) + except Exception as e: + logger.warning( + f"Error initializing trace batch: {e}. Continuing without tracing." + ) + self.trace_batch_id = None + return None if response is None: logger.warning( "Trace batch initialization failed gracefully. Continuing without tracing." ) - return + self.trace_batch_id = None + return None + + # Fall back to ephemeral on auth failure (expired/revoked token) + if response.status_code in [401, 403] and not use_ephemeral: + logger.warning( + "Auth rejected by server, falling back to ephemeral tracing." + ) + self.is_current_batch_ephemeral = True + return self._initialize_backend_batch( + user_context, + execution_metadata, + use_ephemeral=True, + skip_context_check=skip_context_check, + ) if response.status_code in [201, 200]: + self.is_current_batch_ephemeral = use_ephemeral response_data = response.json() self.trace_batch_id = ( response_data["trace_id"] @@ -165,11 +201,22 @@ class TraceBatchManager: logger.warning( f"Trace batch initialization returned status {response.status_code}. Continuing without tracing." ) + self.trace_batch_id = None except Exception as e: logger.warning( f"Error initializing trace batch: {e}. Continuing without tracing." ) + self.trace_batch_id = None + + def _mark_batch_as_failed(self, trace_batch_id: str, error_message: str) -> None: + """Mark a trace batch as failed, routing to the correct endpoint.""" + if self.is_current_batch_ephemeral: + self.plus_api.mark_ephemeral_trace_batch_as_failed( + trace_batch_id, error_message + ) + else: + self.plus_api.mark_trace_batch_as_failed(trace_batch_id, error_message) def begin_event_processing(self) -> None: """Mark that an event handler started processing (for synchronization).""" @@ -260,7 +307,7 @@ class TraceBatchManager: logger.error( "Event handler timeout - marking batch as failed due to incomplete events" ) - self.plus_api.mark_trace_batch_as_failed( + self._mark_batch_as_failed( self.trace_batch_id, "Timeout waiting for event handlers - events incomplete", ) @@ -284,7 +331,7 @@ class TraceBatchManager: events_sent_to_backend_status = self._send_events_to_backend() self.event_buffer = original_buffer if events_sent_to_backend_status == 500 and self.trace_batch_id: - self.plus_api.mark_trace_batch_as_failed( + self._mark_batch_as_failed( self.trace_batch_id, "Error sending events to backend" ) return None @@ -364,13 +411,16 @@ class TraceBatchManager: logger.error( f"❌ Failed to finalize trace batch: {response.status_code} - {response.text}" ) - self.plus_api.mark_trace_batch_as_failed( - self.trace_batch_id, response.text - ) + self._mark_batch_as_failed(self.trace_batch_id, response.text) except Exception as e: logger.error(f"❌ Error finalizing trace batch: {e}") - self.plus_api.mark_trace_batch_as_failed(self.trace_batch_id, str(e)) + try: + self._mark_batch_as_failed(self.trace_batch_id, str(e)) + except Exception: + logger.debug( + "Could not mark trace batch as failed (network unavailable)" + ) def _cleanup_batch_data(self) -> None: """Clean up batch data after successful finalization to free memory""" diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index b86d77aa1..9d81f1d55 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -235,8 +235,11 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(FlowStartedEvent) def on_flow_started(source: Any, event: FlowStartedEvent) -> None: - if not self.batch_manager.is_batch_initialized(): - self._initialize_flow_batch(source, event) + # Always call _initialize_flow_batch to claim ownership. + # If batch was already initialized by a concurrent action event + # (race condition), initialize_batch() returns early but + # batch_owner_type is still correctly set to "flow". + self._initialize_flow_batch(source, event) self._handle_trace_event("flow_started", source, event) @event_bus.on(MethodExecutionStartedEvent) @@ -266,7 +269,12 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(CrewKickoffStartedEvent) def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None: - if not self.batch_manager.is_batch_initialized(): + if self.batch_manager.batch_owner_type != "flow": + # Always call _initialize_crew_batch to claim ownership. + # If batch was already initialized by a concurrent action event + # (race condition with DefaultEnvEvent), initialize_batch() returns + # early but batch_owner_type is still correctly set to "crew". + # Skip only when a parent flow already owns the batch. self._initialize_crew_batch(source, event) self._handle_trace_event("crew_kickoff_started", source, event) @@ -772,7 +780,7 @@ class TraceCollectionListener(BaseEventListener): "crew_name": getattr(source, "name", "Unknown Crew"), "crewai_version": get_crewai_version(), } - self.batch_manager.initialize_batch(user_context, execution_metadata) + self._initialize_batch(user_context, execution_metadata) self.batch_manager.begin_event_processing() try: diff --git a/lib/crewai/src/crewai/events/utils/console_formatter.py b/lib/crewai/src/crewai/events/utils/console_formatter.py index 0984406e9..7879a4d93 100644 --- a/lib/crewai/src/crewai/events/utils/console_formatter.py +++ b/lib/crewai/src/crewai/events/utils/console_formatter.py @@ -127,6 +127,9 @@ To update, run: uv sync --upgrade-package crewai""" def _show_tracing_disabled_message_if_needed(self) -> None: """Show tracing disabled message if tracing is not enabled.""" + from crewai.events.listeners.tracing.trace_listener import ( + TraceCollectionListener, + ) from crewai.events.listeners.tracing.utils import ( has_user_declined_tracing, is_tracing_enabled_in_context, @@ -136,6 +139,12 @@ To update, run: uv sync --upgrade-package crewai""" if should_suppress_tracing_messages(): return + # Don't show "disabled" message when the first-time handler will show + # the trace prompt after execution completes (avoids confusing mid-flow messages) + listener = TraceCollectionListener._instance # type: ignore[misc] + if listener and listener.first_time_handler.is_first_time: + return + if not is_tracing_enabled_in_context(): if has_user_declined_tracing(): message = """Info: Tracing is disabled. diff --git a/lib/crewai/tests/tracing/test_tracing.py b/lib/crewai/tests/tracing/test_tracing.py index c2558c17c..92f6e31c5 100644 --- a/lib/crewai/tests/tracing/test_tracing.py +++ b/lib/crewai/tests/tracing/test_tracing.py @@ -7,6 +7,7 @@ from crewai.events.listeners.tracing.first_time_trace_handler import ( FirstTimeTraceHandler, ) from crewai.events.listeners.tracing.trace_batch_manager import ( + TraceBatch, TraceBatchManager, ) from crewai.events.listeners.tracing.trace_listener import ( @@ -657,6 +658,16 @@ class TestTraceListenerSetup: trace_listener.first_time_handler.collected_events = True + mock_batch_response = MagicMock() + mock_batch_response.status_code = 201 + mock_batch_response.json.return_value = { + "trace_id": "mock-trace-id", + "ephemeral_trace_id": "mock-ephemeral-trace-id", + "access_code": "TRACE-mock", + } + mock_events_response = MagicMock() + mock_events_response.status_code = 200 + with ( patch.object( trace_listener.first_time_handler, @@ -666,6 +677,40 @@ class TestTraceListenerSetup: patch.object( trace_listener.first_time_handler, "_display_ephemeral_trace_link" ) as mock_display_link, + patch.object( + trace_listener.batch_manager.plus_api, + "initialize_trace_batch", + return_value=mock_batch_response, + ), + patch.object( + trace_listener.batch_manager.plus_api, + "initialize_ephemeral_trace_batch", + return_value=mock_batch_response, + ), + patch.object( + trace_listener.batch_manager.plus_api, + "send_trace_events", + return_value=mock_events_response, + ), + patch.object( + trace_listener.batch_manager.plus_api, + "send_ephemeral_trace_events", + return_value=mock_events_response, + ), + patch.object( + trace_listener.batch_manager.plus_api, + "finalize_trace_batch", + return_value=mock_events_response, + ), + patch.object( + trace_listener.batch_manager.plus_api, + "finalize_ephemeral_trace_batch", + return_value=mock_events_response, + ), + patch.object( + trace_listener.batch_manager, + "_cleanup_batch_data", + ), ): crew.kickoff() wait_for_event_handlers() @@ -918,3 +963,676 @@ class TestTraceListenerSetup: mock_init.assert_called_once() payload = mock_init.call_args[0][0] assert "user_identifier" not in payload + + +class TestTraceBatchIdClearedOnFailure: + """Tests: trace_batch_id is cleared when _initialize_backend_batch fails.""" + + def _make_batch_manager(self): + """Create a TraceBatchManager with a pre-set trace_batch_id (simulating first-time user).""" + with patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ): + bm = TraceBatchManager() + bm.current_batch = TraceBatch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew", "crew_name": "test"}, + ) + bm.trace_batch_id = bm.current_batch.batch_id # simulate line 96 + bm.is_current_batch_ephemeral = True + return bm + + def test_trace_batch_id_cleared_on_exception(self): + """trace_batch_id must be None when the API call raises an exception.""" + bm = self._make_batch_manager() + assert bm.trace_batch_id is not None + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + side_effect=ConnectionError("network down"), + ), + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id is None + + def test_trace_batch_id_set_on_success(self): + """trace_batch_id must be set from the server response on success.""" + bm = self._make_batch_manager() + server_id = "server-ephemeral-trace-id-999" + + mock_response = MagicMock( + status_code=201, + json=MagicMock(return_value={"ephemeral_trace_id": server_id}), + ) + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=mock_response, + ), + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id == server_id + + def test_send_events_skipped_when_trace_batch_id_none(self): + """_send_events_to_backend must return early when trace_batch_id is None.""" + bm = self._make_batch_manager() + bm.trace_batch_id = None + bm.event_buffer = [MagicMock()] # has events + + with patch.object( + bm.plus_api, "send_ephemeral_trace_events" + ) as mock_send: + result = bm._send_events_to_backend() + + assert result == 500 + mock_send.assert_not_called() + + +class TestInitializeBackendBatchRetry: + """Tests for retry logic in _initialize_backend_batch.""" + + def _make_batch_manager(self): + """Create a TraceBatchManager with a pre-set trace_batch_id.""" + with patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ): + bm = TraceBatchManager() + bm.current_batch = TraceBatch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew", "crew_name": "test"}, + ) + bm.trace_batch_id = bm.current_batch.batch_id + bm.is_current_batch_ephemeral = True + return bm + + def test_retries_on_none_response_then_succeeds(self): + """Retries when API returns None, succeeds on second attempt.""" + bm = self._make_batch_manager() + server_id = "server-id-after-retry" + + success_response = MagicMock( + status_code=201, + json=MagicMock(return_value={"ephemeral_trace_id": server_id}), + ) + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + side_effect=[None, success_response], + ) as mock_init, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep, + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id == server_id + assert mock_init.call_count == 2 + mock_sleep.assert_called_once_with(0.2) + + def test_retries_on_5xx_then_succeeds(self): + """Retries on 500 server error, succeeds on second attempt.""" + bm = self._make_batch_manager() + server_id = "server-id-after-5xx" + + error_response = MagicMock(status_code=500, text="Internal Server Error") + success_response = MagicMock( + status_code=201, + json=MagicMock(return_value={"ephemeral_trace_id": server_id}), + ) + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + side_effect=[error_response, success_response], + ) as mock_init, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id == server_id + assert mock_init.call_count == 2 + + def test_no_retry_on_exception(self): + """Exceptions (e.g. timeout, connection error) abort immediately without retry.""" + bm = self._make_batch_manager() + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + side_effect=ConnectionError("network down"), + ) as mock_init, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep, + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id is None + assert mock_init.call_count == 1 + mock_sleep.assert_not_called() + + def test_no_retry_on_4xx(self): + """Does NOT retry on 422 — client error is not transient.""" + bm = self._make_batch_manager() + + error_response = MagicMock(status_code=422, text="Unprocessable Entity") + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=error_response, + ) as mock_init, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep, + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id is None + assert mock_init.call_count == 1 + mock_sleep.assert_not_called() + + def test_exhausts_retries_then_clears_batch_id(self): + """After all retries fail, trace_batch_id is None.""" + bm = self._make_batch_manager() + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=None, + ) as mock_init, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id is None + assert mock_init.call_count == 2 # initial + 1 retry + + +class TestFirstTimeHandlerBackendInitGuard: + """Tests: backend_initialized gated on actual batch creation success.""" + + def _make_handler_with_manager(self): + """Create a FirstTimeTraceHandler wired to a TraceBatchManager.""" + with patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ): + bm = TraceBatchManager() + bm.current_batch = TraceBatch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew", "crew_name": "test"}, + ) + bm.trace_batch_id = bm.current_batch.batch_id + bm.is_current_batch_ephemeral = True + + handler = FirstTimeTraceHandler() + handler.is_first_time = True + handler.collected_events = True + handler.batch_manager = bm + return handler, bm + + def test_backend_initialized_true_on_success(self): + """Events are sent when batch creation succeeds, then state is cleaned up.""" + handler, bm = self._make_handler_with_manager() + server_id = "server-id-abc" + + mock_init_response = MagicMock( + status_code=201, + json=MagicMock(return_value={"ephemeral_trace_id": server_id}), + ) + mock_send_response = MagicMock(status_code=200) + + trace_batch_id_during_send = None + + def capture_send(*args, **kwargs): + nonlocal trace_batch_id_during_send + trace_batch_id_during_send = bm.trace_batch_id + return mock_send_response + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=mock_init_response, + ), + patch.object( + bm.plus_api, + "send_ephemeral_trace_events", + side_effect=capture_send, + ), + patch.object(bm, "_finalize_backend_batch"), + ): + bm.event_buffer = [MagicMock(to_dict=MagicMock(return_value={}))] + handler._initialize_backend_and_send_events() + + # trace_batch_id was set correctly during send + assert trace_batch_id_during_send == server_id + # State cleaned up after completion (singleton reuse) + assert bm.backend_initialized is False + assert bm.trace_batch_id is None + assert bm.current_batch is None + + def test_backend_initialized_false_on_failure(self): + """backend_initialized stays False and events are NOT sent when batch creation fails.""" + handler, bm = self._make_handler_with_manager() + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=None, # server call fails + ), + patch.object(bm, "_send_events_to_backend") as mock_send, + patch.object(bm, "_finalize_backend_batch") as mock_finalize, + patch.object(handler, "_gracefully_fail") as mock_fail, + ): + bm.event_buffer = [MagicMock()] + handler._initialize_backend_and_send_events() + + assert bm.backend_initialized is False + assert bm.trace_batch_id is None + mock_send.assert_not_called() + mock_finalize.assert_not_called() + mock_fail.assert_called_once() + + def test_backend_initialized_false_on_non_2xx(self): + """backend_initialized stays False when server returns non-2xx.""" + handler, bm = self._make_handler_with_manager() + + mock_response = MagicMock(status_code=500, text="Internal Server Error") + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=mock_response, + ), + patch.object(bm, "_send_events_to_backend") as mock_send, + patch.object(bm, "_finalize_backend_batch") as mock_finalize, + patch.object(handler, "_gracefully_fail") as mock_fail, + ): + bm.event_buffer = [MagicMock()] + handler._initialize_backend_and_send_events() + + assert bm.backend_initialized is False + assert bm.trace_batch_id is None + mock_send.assert_not_called() + mock_finalize.assert_not_called() + mock_fail.assert_called_once() + + +class TestFirstTimeHandlerAlwaysEphemeral: + """Tests that first-time handler always uses ephemeral with skip_context_check.""" + + def _make_handler_with_manager(self): + with patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ): + bm = TraceBatchManager() + bm.current_batch = TraceBatch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew", "crew_name": "test"}, + ) + bm.trace_batch_id = bm.current_batch.batch_id + bm.is_current_batch_ephemeral = True + + handler = FirstTimeTraceHandler() + handler.is_first_time = True + handler.collected_events = True + handler.batch_manager = bm + return handler, bm + + def test_deferred_init_uses_ephemeral_and_skip_context_check(self): + """Deferred backend init always uses ephemeral=True and skip_context_check=True.""" + handler, bm = self._make_handler_with_manager() + + with ( + patch.object(bm, "_initialize_backend_batch") as mock_init, + patch.object(bm, "_send_events_to_backend"), + patch.object(bm, "_finalize_backend_batch"), + ): + mock_init.side_effect = lambda **kwargs: None + bm.event_buffer = [MagicMock()] + handler._initialize_backend_and_send_events() + + mock_init.assert_called_once() + assert mock_init.call_args.kwargs["use_ephemeral"] is True + assert mock_init.call_args.kwargs["skip_context_check"] is True + + +class TestAuthFailbackToEphemeral: + """Tests for ephemeral fallback when server rejects auth (401/403).""" + + def _make_batch_manager(self): + """Create a TraceBatchManager with a pre-set trace_batch_id.""" + with patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ): + bm = TraceBatchManager() + bm.current_batch = TraceBatch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew", "crew_name": "test"}, + ) + bm.trace_batch_id = bm.current_batch.batch_id + bm.is_current_batch_ephemeral = False # authenticated path + return bm + + def test_401_non_ephemeral_falls_back_to_ephemeral(self): + """A 401 on the non-ephemeral endpoint should retry as ephemeral.""" + bm = self._make_batch_manager() + server_id = "ephemeral-fallback-id" + + auth_rejected = MagicMock(status_code=401, text="Bad credentials") + ephemeral_success = MagicMock( + status_code=201, + json=MagicMock(return_value={"ephemeral_trace_id": server_id}), + ) + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_trace_batch", + return_value=auth_rejected, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=ephemeral_success, + ) as mock_ephemeral, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=False, + ) + + assert bm.trace_batch_id == server_id + assert bm.is_current_batch_ephemeral is True + mock_ephemeral.assert_called_once() + + def test_403_non_ephemeral_falls_back_to_ephemeral(self): + """A 403 on the non-ephemeral endpoint should also fall back.""" + bm = self._make_batch_manager() + server_id = "ephemeral-fallback-403" + + forbidden = MagicMock(status_code=403, text="Forbidden") + ephemeral_success = MagicMock( + status_code=201, + json=MagicMock(return_value={"ephemeral_trace_id": server_id}), + ) + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_trace_batch", + return_value=forbidden, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=ephemeral_success, + ), + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=False, + ) + + assert bm.trace_batch_id == server_id + assert bm.is_current_batch_ephemeral is True + + def test_401_on_ephemeral_does_not_recurse(self): + """A 401 on the ephemeral endpoint should NOT try to fall back again.""" + bm = self._make_batch_manager() + bm.is_current_batch_ephemeral = True + + auth_rejected = MagicMock(status_code=401, text="Bad credentials") + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=auth_rejected, + ) as mock_ephemeral, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id is None + # Called only once — no recursive fallback + mock_ephemeral.assert_called() + + def test_401_fallback_ephemeral_also_fails(self): + """If ephemeral fallback also fails, trace_batch_id is cleared.""" + bm = self._make_batch_manager() + + auth_rejected = MagicMock(status_code=401, text="Bad credentials") + ephemeral_fail = MagicMock(status_code=422, text="Validation failed") + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_trace_batch", + return_value=auth_rejected, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=ephemeral_fail, + ), + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=False, + ) + + assert bm.trace_batch_id is None + + +class TestMarkBatchAsFailedRouting: + """Tests: _mark_batch_as_failed routes to the correct endpoint.""" + + def _make_batch_manager(self, ephemeral: bool = False): + with patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ): + bm = TraceBatchManager() + bm.is_current_batch_ephemeral = ephemeral + return bm + + def test_routes_to_ephemeral_endpoint_when_ephemeral(self): + """Ephemeral batches must use mark_ephemeral_trace_batch_as_failed.""" + bm = self._make_batch_manager(ephemeral=True) + + with patch.object( + bm.plus_api, "mark_ephemeral_trace_batch_as_failed" + ) as mock_ephemeral, patch.object( + bm.plus_api, "mark_trace_batch_as_failed" + ) as mock_non_ephemeral: + bm._mark_batch_as_failed("batch-123", "some error") + + mock_ephemeral.assert_called_once_with("batch-123", "some error") + mock_non_ephemeral.assert_not_called() + + def test_routes_to_non_ephemeral_endpoint_when_not_ephemeral(self): + """Non-ephemeral batches must use mark_trace_batch_as_failed.""" + bm = self._make_batch_manager(ephemeral=False) + + with patch.object( + bm.plus_api, "mark_ephemeral_trace_batch_as_failed" + ) as mock_ephemeral, patch.object( + bm.plus_api, "mark_trace_batch_as_failed" + ) as mock_non_ephemeral: + bm._mark_batch_as_failed("batch-456", "another error") + + mock_non_ephemeral.assert_called_once_with("batch-456", "another error") + mock_ephemeral.assert_not_called() + + +class TestBackendInitializedGatedOnSuccess: + """Tests: backend_initialized reflects actual init success on non-first-time path.""" + + def test_backend_initialized_true_on_success(self): + """backend_initialized is True when _initialize_backend_batch succeeds.""" + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces", + return_value=False, + ), + patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ), + ): + bm = TraceBatchManager() + mock_response = MagicMock( + status_code=201, + json=MagicMock(return_value={"trace_id": "server-id"}), + ) + with patch.object( + bm.plus_api, "initialize_trace_batch", return_value=mock_response + ): + bm.initialize_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + ) + + assert bm.backend_initialized is True + assert bm.trace_batch_id == "server-id" + + def test_backend_initialized_false_on_failure(self): + """backend_initialized is False when _initialize_backend_batch fails.""" + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces", + return_value=False, + ), + patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ), + ): + bm = TraceBatchManager() + with patch.object( + bm.plus_api, "initialize_trace_batch", return_value=None + ): + bm.initialize_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + ) + + assert bm.backend_initialized is False + assert bm.trace_batch_id is None