diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py index fe944eb7d..15f797eec 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py @@ -1,3 +1,4 @@ +import time from dataclasses import dataclass, field from datetime import datetime, timezone from logging import getLogger @@ -142,11 +143,39 @@ class TraceBatchManager: payload["ephemeral_trace_id"] = self.current_batch.batch_id payload["user_identifier"] = get_user_id() - response = ( - self.plus_api.initialize_ephemeral_trace_batch(payload) - if use_ephemeral - else self.plus_api.initialize_trace_batch(payload) - ) + max_retries = 2 + response = None + last_exception = None + + for attempt in range(max_retries + 1): + try: + response = ( + self.plus_api.initialize_ephemeral_trace_batch(payload) + if use_ephemeral + else self.plus_api.initialize_trace_batch(payload) + ) + if response is not None and response.status_code < 500: + break + if attempt < max_retries: + logger.debug( + f"Trace batch init attempt {attempt + 1} failed " + f"(status={response.status_code if response else 'None'}), retrying..." + ) + time.sleep(1) + except Exception as e: + last_exception = e + if attempt < max_retries: + logger.debug( + f"Trace batch init attempt {attempt + 1} raised {type(e).__name__}, retrying..." + ) + time.sleep(1) + + if last_exception and response is None: + logger.warning( + f"Error initializing trace batch: {last_exception}. Continuing without tracing." + ) + self.trace_batch_id = None + return if response is None: logger.warning( diff --git a/lib/crewai/tests/tracing/test_tracing.py b/lib/crewai/tests/tracing/test_tracing.py index b9028ed2e..890567631 100644 --- a/lib/crewai/tests/tracing/test_tracing.py +++ b/lib/crewai/tests/tracing/test_tracing.py @@ -1058,6 +1058,173 @@ class TestTraceBatchIdClearedOnFailure: mock_send.assert_not_called() +class TestInitializeBackendBatchRetry: + """Tests for retry logic in _initialize_backend_batch.""" + + def _make_batch_manager(self): + """Create a TraceBatchManager with a pre-set trace_batch_id.""" + with patch( + "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", + return_value="mock_token", + ): + bm = TraceBatchManager() + bm.current_batch = TraceBatch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew", "crew_name": "test"}, + ) + bm.trace_batch_id = bm.current_batch.batch_id + bm.is_current_batch_ephemeral = True + return bm + + def test_retries_on_none_response_then_succeeds(self): + """Retries when API returns None, succeeds on second attempt.""" + bm = self._make_batch_manager() + server_id = "server-id-after-retry" + + success_response = MagicMock( + status_code=201, + json=MagicMock(return_value={"ephemeral_trace_id": server_id}), + ) + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + side_effect=[None, success_response], + ) as mock_init, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep, + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id == server_id + assert mock_init.call_count == 2 + mock_sleep.assert_called_once_with(1) + + def test_retries_on_5xx_then_succeeds(self): + """Retries on 500 server error, succeeds on second attempt.""" + bm = self._make_batch_manager() + server_id = "server-id-after-5xx" + + error_response = MagicMock(status_code=500, text="Internal Server Error") + success_response = MagicMock( + status_code=201, + json=MagicMock(return_value={"ephemeral_trace_id": server_id}), + ) + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + side_effect=[error_response, success_response], + ) as mock_init, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id == server_id + assert mock_init.call_count == 2 + + def test_retries_on_exception_then_succeeds(self): + """Retries on ConnectionError, succeeds on second attempt.""" + bm = self._make_batch_manager() + server_id = "server-id-after-exception" + + success_response = MagicMock( + status_code=201, + json=MagicMock(return_value={"ephemeral_trace_id": server_id}), + ) + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + side_effect=[ConnectionError("network down"), success_response], + ) as mock_init, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id == server_id + assert mock_init.call_count == 2 + + def test_no_retry_on_4xx(self): + """Does NOT retry on 422 — client error is not transient.""" + bm = self._make_batch_manager() + + error_response = MagicMock(status_code=422, text="Unprocessable Entity") + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=error_response, + ) as mock_init, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep, + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id is None + assert mock_init.call_count == 1 + mock_sleep.assert_not_called() + + def test_exhausts_retries_then_clears_batch_id(self): + """After all retries fail, trace_batch_id is None.""" + bm = self._make_batch_manager() + + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + patch.object( + bm.plus_api, + "initialize_ephemeral_trace_batch", + return_value=None, + ) as mock_init, + patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), + ): + bm._initialize_backend_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={"execution_type": "crew"}, + use_ephemeral=True, + ) + + assert bm.trace_batch_id is None + assert mock_init.call_count == 3 # initial + 2 retries + + class TestFirstTimeHandlerBackendInitGuard: """Tests for Fix 2: backend_initialized gated on actual batch creation success."""