diff --git a/lib/crewai/src/crewai/cli/plus_api.py b/lib/crewai/src/crewai/cli/plus_api.py index e32e5220d..665221f1e 100644 --- a/lib/crewai/src/crewai/cli/plus_api.py +++ b/lib/crewai/src/crewai/cli/plus_api.py @@ -196,6 +196,16 @@ class PlusAPI: timeout=30, ) + def mark_ephemeral_trace_batch_as_failed( + self, trace_batch_id: str, error_message: str + ) -> httpx.Response: + return self._make_request( + "PATCH", + f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}", + json={"status": "failed", "failure_reason": error_message}, + timeout=30, + ) + def get_mcp_configs(self, slugs: list[str]) -> httpx.Response: """Get MCP server configurations for the given slugs.""" return self._make_request( diff --git a/lib/crewai/src/crewai/events/listeners/tracing/first_time_trace_handler.py b/lib/crewai/src/crewai/events/listeners/tracing/first_time_trace_handler.py index 561c0f069..45d06e6b6 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/first_time_trace_handler.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/first_time_trace_handler.py @@ -125,7 +125,7 @@ class FirstTimeTraceHandler: if self.batch_manager.event_buffer: send_status = self.batch_manager._send_events_to_backend() if send_status == 500 and self.batch_manager.trace_batch_id: - self.batch_manager.plus_api.mark_trace_batch_as_failed( + self.batch_manager._mark_batch_as_failed( self.batch_manager.trace_batch_id, "Error sending events to backend", ) diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py index 3e5fe0868..28286664e 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py @@ -99,7 +99,7 @@ class TraceBatchManager: self._initialize_backend_batch( user_context, execution_metadata, use_ephemeral ) - self.backend_initialized = True + self.backend_initialized = self.trace_batch_id is not None self._batch_ready_cv.notify_all() return self.current_batch @@ -209,6 +209,13 @@ class TraceBatchManager: ) self.trace_batch_id = None + def _mark_batch_as_failed(self, trace_batch_id: str, error_message: str) -> None: + """Mark a trace batch as failed, routing to the correct endpoint.""" + if self.is_current_batch_ephemeral: + self.plus_api.mark_ephemeral_trace_batch_as_failed(trace_batch_id, error_message) + else: + self.plus_api.mark_trace_batch_as_failed(trace_batch_id, error_message) + def begin_event_processing(self) -> None: """Mark that an event handler started processing (for synchronization).""" with self._pending_events_lock: @@ -298,7 +305,7 @@ class TraceBatchManager: logger.error( "Event handler timeout - marking batch as failed due to incomplete events" ) - self.plus_api.mark_trace_batch_as_failed( + self._mark_batch_as_failed( self.trace_batch_id, "Timeout waiting for event handlers - events incomplete", ) @@ -322,7 +329,7 @@ class TraceBatchManager: events_sent_to_backend_status = self._send_events_to_backend() self.event_buffer = original_buffer if events_sent_to_backend_status == 500 and self.trace_batch_id: - self.plus_api.mark_trace_batch_as_failed( + self._mark_batch_as_failed( self.trace_batch_id, "Error sending events to backend" ) return None @@ -402,13 +409,16 @@ class TraceBatchManager: logger.error( f"❌ Failed to finalize trace batch: {response.status_code} - {response.text}" ) - self.plus_api.mark_trace_batch_as_failed( + self._mark_batch_as_failed( self.trace_batch_id, response.text ) except Exception as e: logger.error(f"❌ Error finalizing trace batch: {e}") - self.plus_api.mark_trace_batch_as_failed(self.trace_batch_id, str(e)) + try: + self._mark_batch_as_failed(self.trace_batch_id, str(e)) + except Exception: + logger.debug("Could not mark trace batch as failed (network unavailable)") def _cleanup_batch_data(self) -> None: """Clean up batch data after successful finalization to free memory""" diff --git a/lib/crewai/tests/tracing/test_tracing.py b/lib/crewai/tests/tracing/test_tracing.py index 154f341d8..92f6e31c5 100644 --- a/lib/crewai/tests/tracing/test_tracing.py +++ b/lib/crewai/tests/tracing/test_tracing.py @@ -1531,3 +1531,108 @@ class TestAuthFailbackToEphemeral: ) assert bm.trace_batch_id is None + + +class TestMarkBatchAsFailedRouting: + """Tests: _mark_batch_as_failed routes to the correct endpoint.""" + + def _make_batch_manager(self, ephemeral: bool = False): + with patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ): + bm = TraceBatchManager() + bm.is_current_batch_ephemeral = ephemeral + return bm + + def test_routes_to_ephemeral_endpoint_when_ephemeral(self): + """Ephemeral batches must use mark_ephemeral_trace_batch_as_failed.""" + bm = self._make_batch_manager(ephemeral=True) + + with patch.object( + bm.plus_api, "mark_ephemeral_trace_batch_as_failed" + ) as mock_ephemeral, patch.object( + bm.plus_api, "mark_trace_batch_as_failed" + ) as mock_non_ephemeral: + bm._mark_batch_as_failed("batch-123", "some error") + + mock_ephemeral.assert_called_once_with("batch-123", "some error") + mock_non_ephemeral.assert_not_called() + + def test_routes_to_non_ephemeral_endpoint_when_not_ephemeral(self): + """Non-ephemeral batches must use mark_trace_batch_as_failed.""" + bm = self._make_batch_manager(ephemeral=False) + + with patch.object( + bm.plus_api, "mark_ephemeral_trace_batch_as_failed" + ) as mock_ephemeral, patch.object( + bm.plus_api, "mark_trace_batch_as_failed" + ) as mock_non_ephemeral: + bm._mark_batch_as_failed("batch-456", "another error") + + mock_non_ephemeral.assert_called_once_with("batch-456", "another error") + mock_ephemeral.assert_not_called() + + +class TestBackendInitializedGatedOnSuccess: + """Tests: backend_initialized reflects actual init success on non-first-time path.""" + + def test_backend_initialized_true_on_success(self): + """backend_initialized is True when _initialize_backend_batch succeeds.""" + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces", + return_value=False, + ), + patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ), + ): + bm = TraceBatchManager() + mock_response = MagicMock( + status_code=201, + json=MagicMock(return_value={"trace_id": "server-id"}), + ) + with patch.object( + bm.plus_api, "initialize_trace_batch", return_value=mock_response + ): + bm.initialize_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + ) + + assert bm.backend_initialized is True + assert bm.trace_batch_id == "server-id" + + def test_backend_initialized_false_on_failure(self): + """backend_initialized is False when _initialize_backend_batch fails.""" + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces", + return_value=False, + ), + patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ), + ): + bm = TraceBatchManager() + with patch.object( + bm.plus_api, "initialize_trace_batch", return_value=None + ): + bm.initialize_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + ) + + assert bm.backend_initialized is False + assert bm.trace_batch_id is None