mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-02 13:48:09 +00:00
fix(tracing): route mark-as-failed to correct endpoint and gate backend_initialized on success
mark_trace_batch_as_failed always hit the non-ephemeral endpoint, causing 404s when called on ephemeral batches — the same class of endpoint mismatch this branch aims to fix. Add mark_ephemeral_trace_batch_as_failed to PlusAPI and a _mark_batch_as_failed helper on TraceBatchManager that routes based on is_current_batch_ephemeral. Also gate backend_initialized on trace_batch_id being set so the non-first-time path no longer reports success when _initialize_backend_batch actually failed.
This commit is contained in:
@@ -196,6 +196,16 @@ class PlusAPI:
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
def mark_ephemeral_trace_batch_as_failed(
|
||||
self, trace_batch_id: str, error_message: str
|
||||
) -> httpx.Response:
|
||||
return self._make_request(
|
||||
"PATCH",
|
||||
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}",
|
||||
json={"status": "failed", "failure_reason": error_message},
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
def get_mcp_configs(self, slugs: list[str]) -> httpx.Response:
|
||||
"""Get MCP server configurations for the given slugs."""
|
||||
return self._make_request(
|
||||
|
||||
@@ -125,7 +125,7 @@ class FirstTimeTraceHandler:
|
||||
if self.batch_manager.event_buffer:
|
||||
send_status = self.batch_manager._send_events_to_backend()
|
||||
if send_status == 500 and self.batch_manager.trace_batch_id:
|
||||
self.batch_manager.plus_api.mark_trace_batch_as_failed(
|
||||
self.batch_manager._mark_batch_as_failed(
|
||||
self.batch_manager.trace_batch_id,
|
||||
"Error sending events to backend",
|
||||
)
|
||||
|
||||
@@ -99,7 +99,7 @@ class TraceBatchManager:
|
||||
self._initialize_backend_batch(
|
||||
user_context, execution_metadata, use_ephemeral
|
||||
)
|
||||
self.backend_initialized = True
|
||||
self.backend_initialized = self.trace_batch_id is not None
|
||||
|
||||
self._batch_ready_cv.notify_all()
|
||||
return self.current_batch
|
||||
@@ -209,6 +209,13 @@ class TraceBatchManager:
|
||||
)
|
||||
self.trace_batch_id = None
|
||||
|
||||
def _mark_batch_as_failed(self, trace_batch_id: str, error_message: str) -> None:
|
||||
"""Mark a trace batch as failed, routing to the correct endpoint."""
|
||||
if self.is_current_batch_ephemeral:
|
||||
self.plus_api.mark_ephemeral_trace_batch_as_failed(trace_batch_id, error_message)
|
||||
else:
|
||||
self.plus_api.mark_trace_batch_as_failed(trace_batch_id, error_message)
|
||||
|
||||
def begin_event_processing(self) -> None:
|
||||
"""Mark that an event handler started processing (for synchronization)."""
|
||||
with self._pending_events_lock:
|
||||
@@ -298,7 +305,7 @@ class TraceBatchManager:
|
||||
logger.error(
|
||||
"Event handler timeout - marking batch as failed due to incomplete events"
|
||||
)
|
||||
self.plus_api.mark_trace_batch_as_failed(
|
||||
self._mark_batch_as_failed(
|
||||
self.trace_batch_id,
|
||||
"Timeout waiting for event handlers - events incomplete",
|
||||
)
|
||||
@@ -322,7 +329,7 @@ class TraceBatchManager:
|
||||
events_sent_to_backend_status = self._send_events_to_backend()
|
||||
self.event_buffer = original_buffer
|
||||
if events_sent_to_backend_status == 500 and self.trace_batch_id:
|
||||
self.plus_api.mark_trace_batch_as_failed(
|
||||
self._mark_batch_as_failed(
|
||||
self.trace_batch_id, "Error sending events to backend"
|
||||
)
|
||||
return None
|
||||
@@ -402,13 +409,16 @@ class TraceBatchManager:
|
||||
logger.error(
|
||||
f"❌ Failed to finalize trace batch: {response.status_code} - {response.text}"
|
||||
)
|
||||
self.plus_api.mark_trace_batch_as_failed(
|
||||
self._mark_batch_as_failed(
|
||||
self.trace_batch_id, response.text
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error finalizing trace batch: {e}")
|
||||
self.plus_api.mark_trace_batch_as_failed(self.trace_batch_id, str(e))
|
||||
try:
|
||||
self._mark_batch_as_failed(self.trace_batch_id, str(e))
|
||||
except Exception:
|
||||
logger.debug("Could not mark trace batch as failed (network unavailable)")
|
||||
|
||||
def _cleanup_batch_data(self) -> None:
|
||||
"""Clean up batch data after successful finalization to free memory"""
|
||||
|
||||
@@ -1531,3 +1531,108 @@ class TestAuthFailbackToEphemeral:
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
|
||||
|
||||
class TestMarkBatchAsFailedRouting:
|
||||
"""Tests: _mark_batch_as_failed routes to the correct endpoint."""
|
||||
|
||||
def _make_batch_manager(self, ephemeral: bool = False):
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.is_current_batch_ephemeral = ephemeral
|
||||
return bm
|
||||
|
||||
def test_routes_to_ephemeral_endpoint_when_ephemeral(self):
|
||||
"""Ephemeral batches must use mark_ephemeral_trace_batch_as_failed."""
|
||||
bm = self._make_batch_manager(ephemeral=True)
|
||||
|
||||
with patch.object(
|
||||
bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
|
||||
) as mock_ephemeral, patch.object(
|
||||
bm.plus_api, "mark_trace_batch_as_failed"
|
||||
) as mock_non_ephemeral:
|
||||
bm._mark_batch_as_failed("batch-123", "some error")
|
||||
|
||||
mock_ephemeral.assert_called_once_with("batch-123", "some error")
|
||||
mock_non_ephemeral.assert_not_called()
|
||||
|
||||
def test_routes_to_non_ephemeral_endpoint_when_not_ephemeral(self):
|
||||
"""Non-ephemeral batches must use mark_trace_batch_as_failed."""
|
||||
bm = self._make_batch_manager(ephemeral=False)
|
||||
|
||||
with patch.object(
|
||||
bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
|
||||
) as mock_ephemeral, patch.object(
|
||||
bm.plus_api, "mark_trace_batch_as_failed"
|
||||
) as mock_non_ephemeral:
|
||||
bm._mark_batch_as_failed("batch-456", "another error")
|
||||
|
||||
mock_non_ephemeral.assert_called_once_with("batch-456", "another error")
|
||||
mock_ephemeral.assert_not_called()
|
||||
|
||||
|
||||
class TestBackendInitializedGatedOnSuccess:
|
||||
"""Tests: backend_initialized reflects actual init success on non-first-time path."""
|
||||
|
||||
def test_backend_initialized_true_on_success(self):
|
||||
"""backend_initialized is True when _initialize_backend_batch succeeds."""
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
|
||||
return_value=False,
|
||||
),
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
),
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
mock_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"trace_id": "server-id"}),
|
||||
)
|
||||
with patch.object(
|
||||
bm.plus_api, "initialize_trace_batch", return_value=mock_response
|
||||
):
|
||||
bm.initialize_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
)
|
||||
|
||||
assert bm.backend_initialized is True
|
||||
assert bm.trace_batch_id == "server-id"
|
||||
|
||||
def test_backend_initialized_false_on_failure(self):
|
||||
"""backend_initialized is False when _initialize_backend_batch fails."""
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
|
||||
return_value=False,
|
||||
),
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
),
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
with patch.object(
|
||||
bm.plus_api, "initialize_trace_batch", return_value=None
|
||||
):
|
||||
bm.initialize_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
)
|
||||
|
||||
assert bm.backend_initialized is False
|
||||
assert bm.trace_batch_id is None
|
||||
|
||||
Reference in New Issue
Block a user