mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 14:52:36 +00:00
feat: add retry logic for ephemeral trace batch creation
Transient failures (None response, 5xx, network errors) during _initialize_backend_batch now retry up to 2 times with a 1s backoff. Non-transient 4xx errors (422 validation, 401 auth) are not retried since the same payload would fail again. If all retries are exhausted, trace_batch_id is cleared per the existing safety net. This runs post-execution when the user has already answered "y" to view traces, so the ~2s worst-case delay is acceptable.
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from logging import getLogger
|
||||
@@ -142,11 +143,39 @@ class TraceBatchManager:
|
||||
payload["ephemeral_trace_id"] = self.current_batch.batch_id
|
||||
payload["user_identifier"] = get_user_id()
|
||||
|
||||
response = (
|
||||
self.plus_api.initialize_ephemeral_trace_batch(payload)
|
||||
if use_ephemeral
|
||||
else self.plus_api.initialize_trace_batch(payload)
|
||||
)
|
||||
max_retries = 2
|
||||
response = None
|
||||
last_exception = None
|
||||
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
response = (
|
||||
self.plus_api.initialize_ephemeral_trace_batch(payload)
|
||||
if use_ephemeral
|
||||
else self.plus_api.initialize_trace_batch(payload)
|
||||
)
|
||||
if response is not None and response.status_code < 500:
|
||||
break
|
||||
if attempt < max_retries:
|
||||
logger.debug(
|
||||
f"Trace batch init attempt {attempt + 1} failed "
|
||||
f"(status={response.status_code if response else 'None'}), retrying..."
|
||||
)
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
last_exception = e
|
||||
if attempt < max_retries:
|
||||
logger.debug(
|
||||
f"Trace batch init attempt {attempt + 1} raised {type(e).__name__}, retrying..."
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
if last_exception and response is None:
|
||||
logger.warning(
|
||||
f"Error initializing trace batch: {last_exception}. Continuing without tracing."
|
||||
)
|
||||
self.trace_batch_id = None
|
||||
return
|
||||
|
||||
if response is None:
|
||||
logger.warning(
|
||||
|
||||
@@ -1058,6 +1058,173 @@ class TestTraceBatchIdClearedOnFailure:
|
||||
mock_send.assert_not_called()
|
||||
|
||||
|
||||
class TestInitializeBackendBatchRetry:
|
||||
"""Tests for retry logic in _initialize_backend_batch."""
|
||||
|
||||
def _make_batch_manager(self):
|
||||
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id
|
||||
bm.is_current_batch_ephemeral = True
|
||||
return bm
|
||||
|
||||
def test_retries_on_none_response_then_succeeds(self):
|
||||
"""Retries when API returns None, succeeds on second attempt."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "server-id-after-retry"
|
||||
|
||||
success_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
side_effect=[None, success_response],
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert mock_init.call_count == 2
|
||||
mock_sleep.assert_called_once_with(1)
|
||||
|
||||
def test_retries_on_5xx_then_succeeds(self):
|
||||
"""Retries on 500 server error, succeeds on second attempt."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "server-id-after-5xx"
|
||||
|
||||
error_response = MagicMock(status_code=500, text="Internal Server Error")
|
||||
success_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
side_effect=[error_response, success_response],
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert mock_init.call_count == 2
|
||||
|
||||
def test_retries_on_exception_then_succeeds(self):
|
||||
"""Retries on ConnectionError, succeeds on second attempt."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "server-id-after-exception"
|
||||
|
||||
success_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
side_effect=[ConnectionError("network down"), success_response],
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert mock_init.call_count == 2
|
||||
|
||||
def test_no_retry_on_4xx(self):
|
||||
"""Does NOT retry on 422 — client error is not transient."""
|
||||
bm = self._make_batch_manager()
|
||||
|
||||
error_response = MagicMock(status_code=422, text="Unprocessable Entity")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=error_response,
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
assert mock_init.call_count == 1
|
||||
mock_sleep.assert_not_called()
|
||||
|
||||
def test_exhausts_retries_then_clears_batch_id(self):
|
||||
"""After all retries fail, trace_batch_id is None."""
|
||||
bm = self._make_batch_manager()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=None,
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
assert mock_init.call_count == 3 # initial + 2 retries
|
||||
|
||||
|
||||
class TestFirstTimeHandlerBackendInitGuard:
|
||||
"""Tests for Fix 2: backend_initialized gated on actual batch creation success."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user