mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-09 12:38:14 +00:00
feat: fall back to ephemeral tracing on server auth rejection
When the non-ephemeral batch endpoint returns 401 or 403 (expired token, revoked credentials, JWKS rotation), _initialize_backend_batch now switches is_current_batch_ephemeral to True and retries via the ephemeral endpoint. This preserves traces that would otherwise be lost due to the timing gap between client-side token validation and server-side JWT decode. The fallback only triggers on the non-ephemeral path to prevent infinite recursion. If the ephemeral attempt also fails, trace_batch_id is cleared normally. Addresses the 2M+ failed push attempts in which valid client-side tokens were rejected on the server.
This commit is contained in:
@@ -184,6 +184,16 @@ class TraceBatchManager:
|
||||
self.trace_batch_id = None
|
||||
return
|
||||
|
||||
# Fall back to ephemeral on auth failure (expired/revoked token)
|
||||
if response.status_code in [401, 403] and not use_ephemeral:
|
||||
logger.warning(
|
||||
"Auth rejected by server, falling back to ephemeral tracing."
|
||||
)
|
||||
self.is_current_batch_ephemeral = True
|
||||
return self._initialize_backend_batch(
|
||||
user_context, execution_metadata, use_ephemeral=True
|
||||
)
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
response_data = response.json()
|
||||
self.trace_batch_id = (
|
||||
|
||||
@@ -1447,3 +1447,158 @@ class TestFirstTimeAuthRespected:
|
||||
mock_init.assert_called_once()
|
||||
call_kwargs = mock_init.call_args
|
||||
assert call_kwargs.kwargs["use_ephemeral"] is False
|
||||
|
||||
|
||||
class TestAuthFailbackToEphemeral:
|
||||
"""Tests for ephemeral fallback when server rejects auth (401/403)."""
|
||||
|
||||
def _make_batch_manager(self):
|
||||
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id
|
||||
bm.is_current_batch_ephemeral = False # authenticated path
|
||||
return bm
|
||||
|
||||
def test_401_non_ephemeral_falls_back_to_ephemeral(self):
|
||||
"""A 401 on the non-ephemeral endpoint should retry as ephemeral."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "ephemeral-fallback-id"
|
||||
|
||||
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
||||
ephemeral_success = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_trace_batch",
|
||||
return_value=auth_rejected,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=ephemeral_success,
|
||||
) as mock_ephemeral,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=False,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert bm.is_current_batch_ephemeral is True
|
||||
mock_ephemeral.assert_called_once()
|
||||
|
||||
def test_403_non_ephemeral_falls_back_to_ephemeral(self):
|
||||
"""A 403 on the non-ephemeral endpoint should also fall back."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "ephemeral-fallback-403"
|
||||
|
||||
forbidden = MagicMock(status_code=403, text="Forbidden")
|
||||
ephemeral_success = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_trace_batch",
|
||||
return_value=forbidden,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=ephemeral_success,
|
||||
),
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=False,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert bm.is_current_batch_ephemeral is True
|
||||
|
||||
def test_401_on_ephemeral_does_not_recurse(self):
|
||||
"""A 401 on the ephemeral endpoint should NOT try to fall back again."""
|
||||
bm = self._make_batch_manager()
|
||||
bm.is_current_batch_ephemeral = True
|
||||
|
||||
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=auth_rejected,
|
||||
) as mock_ephemeral,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
# Called only once — no recursive fallback
|
||||
mock_ephemeral.assert_called()
|
||||
|
||||
def test_401_fallback_ephemeral_also_fails(self):
|
||||
"""If ephemeral fallback also fails, trace_batch_id is cleared."""
|
||||
bm = self._make_batch_manager()
|
||||
|
||||
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
||||
ephemeral_fail = MagicMock(status_code=422, text="Validation failed")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_trace_batch",
|
||||
return_value=auth_rejected,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=ephemeral_fail,
|
||||
),
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=False,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
|
||||
Reference in New Issue
Block a user