mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-03-19 02:08:17 +00:00
Compare commits
11 Commits
main
...
fix/bad-cr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81a8c2efc1 | ||
|
|
4859dc66a5 | ||
|
|
7f2eda2ac6 | ||
|
|
6734db4f71 | ||
|
|
c00544c97d | ||
|
|
08c533905e | ||
|
|
6d1546c381 | ||
|
|
bcba620a41 | ||
|
|
4141233a78 | ||
|
|
c67425d323 | ||
|
|
7f090c664e |
@@ -1,6 +1,7 @@
|
||||
import logging
|
||||
import uuid
|
||||
import webbrowser
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
@@ -100,20 +101,48 @@ class FirstTimeTraceHandler:
|
||||
user_context=user_context,
|
||||
execution_metadata=execution_metadata,
|
||||
use_ephemeral=True,
|
||||
skip_context_check=True,
|
||||
)
|
||||
|
||||
if not self.batch_manager.trace_batch_id:
|
||||
self._gracefully_fail("Backend batch creation failed, cannot send events.")
|
||||
self._reset_batch_state()
|
||||
return
|
||||
|
||||
self.batch_manager.backend_initialized = True
|
||||
|
||||
if self.batch_manager.event_buffer:
|
||||
self.batch_manager._send_events_to_backend()
|
||||
# Capture values before send/finalize consume them
|
||||
events_count = len(self.batch_manager.event_buffer)
|
||||
batch_id = self.batch_manager.trace_batch_id
|
||||
# Read duration non-destructively — _finalize_backend_batch will consume it
|
||||
start_time = self.batch_manager.execution_start_times.get("execution")
|
||||
duration_ms = (
|
||||
int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
|
||||
if start_time
|
||||
else 0
|
||||
)
|
||||
|
||||
self.batch_manager.finalize_batch()
|
||||
if self.batch_manager.event_buffer:
|
||||
send_status = self.batch_manager._send_events_to_backend()
|
||||
if send_status == 500 and self.batch_manager.trace_batch_id:
|
||||
self.batch_manager.plus_api.mark_trace_batch_as_failed(
|
||||
self.batch_manager.trace_batch_id,
|
||||
"Error sending events to backend",
|
||||
)
|
||||
self._reset_batch_state()
|
||||
return
|
||||
|
||||
self.batch_manager._finalize_backend_batch(events_count)
|
||||
self.ephemeral_url = self.batch_manager.ephemeral_trace_url
|
||||
|
||||
if not self.ephemeral_url:
|
||||
self._show_local_trace_message()
|
||||
self._show_local_trace_message(events_count, duration_ms, batch_id)
|
||||
|
||||
self._reset_batch_state()
|
||||
|
||||
except Exception as e:
|
||||
self._gracefully_fail(f"Backend initialization failed: {e}")
|
||||
self._reset_batch_state()
|
||||
|
||||
def _display_ephemeral_trace_link(self):
|
||||
"""Display the ephemeral trace link to the user and automatically open browser."""
|
||||
@@ -184,6 +213,19 @@ To enable tracing later, do any one of these:
|
||||
console.print(panel)
|
||||
console.print()
|
||||
|
||||
def _reset_batch_state(self):
|
||||
"""Reset batch manager state to allow future executions to re-initialize."""
|
||||
if not self.batch_manager:
|
||||
return
|
||||
self.batch_manager.batch_owner_type = None
|
||||
self.batch_manager.batch_owner_id = None
|
||||
self.batch_manager.current_batch = None
|
||||
self.batch_manager.event_buffer.clear()
|
||||
self.batch_manager.trace_batch_id = None
|
||||
self.batch_manager.is_current_batch_ephemeral = False
|
||||
self.batch_manager.backend_initialized = False
|
||||
self.batch_manager._cleanup_batch_data()
|
||||
|
||||
def _gracefully_fail(self, error_message: str):
|
||||
"""Handle errors gracefully without disrupting user experience."""
|
||||
console = Console()
|
||||
@@ -191,7 +233,7 @@ To enable tracing later, do any one of these:
|
||||
|
||||
logger.debug(f"First-time trace error: {error_message}")
|
||||
|
||||
def _show_local_trace_message(self):
|
||||
def _show_local_trace_message(self, events_count: int = 0, duration_ms: int = 0, batch_id: str | None = None):
|
||||
"""Show message when traces were collected locally but couldn't be uploaded."""
|
||||
console = Console()
|
||||
|
||||
@@ -199,9 +241,9 @@ To enable tracing later, do any one of these:
|
||||
📊 Your execution traces were collected locally!
|
||||
|
||||
Unfortunately, we couldn't upload them to the server right now, but here's what we captured:
|
||||
• {len(self.batch_manager.event_buffer)} trace events
|
||||
• Execution duration: {self.batch_manager.calculate_duration("execution")}ms
|
||||
• Batch ID: {self.batch_manager.trace_batch_id}
|
||||
• {events_count} trace events
|
||||
• Execution duration: {duration_ms}ms
|
||||
• Batch ID: {batch_id}
|
||||
|
||||
✅ Tracing has been enabled for future runs!
|
||||
Your preference has been saved. Future Crew/Flow executions will automatically collect traces.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from logging import getLogger
|
||||
@@ -108,10 +109,11 @@ class TraceBatchManager:
|
||||
user_context: dict[str, str],
|
||||
execution_metadata: dict[str, Any],
|
||||
use_ephemeral: bool = False,
|
||||
skip_context_check: bool = False,
|
||||
) -> None:
|
||||
"""Send batch initialization to backend"""
|
||||
|
||||
if not is_tracing_enabled_in_context():
|
||||
if not skip_context_check and not is_tracing_enabled_in_context():
|
||||
return
|
||||
|
||||
if not self.plus_api or not self.current_batch:
|
||||
@@ -142,19 +144,62 @@ class TraceBatchManager:
|
||||
payload["ephemeral_trace_id"] = self.current_batch.batch_id
|
||||
payload["user_identifier"] = get_user_id()
|
||||
|
||||
response = (
|
||||
self.plus_api.initialize_ephemeral_trace_batch(payload)
|
||||
if use_ephemeral
|
||||
else self.plus_api.initialize_trace_batch(payload)
|
||||
)
|
||||
max_retries = 2
|
||||
response = None
|
||||
last_exception = None
|
||||
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
response = (
|
||||
self.plus_api.initialize_ephemeral_trace_batch(payload)
|
||||
if use_ephemeral
|
||||
else self.plus_api.initialize_trace_batch(payload)
|
||||
)
|
||||
if response is not None and response.status_code < 500:
|
||||
break
|
||||
if attempt < max_retries:
|
||||
logger.debug(
|
||||
f"Trace batch init attempt {attempt + 1} failed "
|
||||
f"(status={response.status_code if response else 'None'}), retrying..."
|
||||
)
|
||||
time.sleep(0.2)
|
||||
except Exception as e:
|
||||
last_exception = e
|
||||
if attempt < max_retries:
|
||||
logger.debug(
|
||||
f"Trace batch init attempt {attempt + 1} raised {type(e).__name__}, retrying..."
|
||||
)
|
||||
time.sleep(0.2)
|
||||
|
||||
if last_exception and response is None:
|
||||
logger.warning(
|
||||
f"Error initializing trace batch: {last_exception}. Continuing without tracing."
|
||||
)
|
||||
self.trace_batch_id = None
|
||||
return
|
||||
|
||||
if response is None:
|
||||
logger.warning(
|
||||
"Trace batch initialization failed gracefully. Continuing without tracing."
|
||||
)
|
||||
self.trace_batch_id = None
|
||||
return
|
||||
|
||||
# Fall back to ephemeral on auth failure (expired/revoked token)
|
||||
if response.status_code in [401, 403] and not use_ephemeral:
|
||||
logger.warning(
|
||||
"Auth rejected by server, falling back to ephemeral tracing."
|
||||
)
|
||||
self.is_current_batch_ephemeral = True
|
||||
return self._initialize_backend_batch(
|
||||
user_context,
|
||||
execution_metadata,
|
||||
use_ephemeral=True,
|
||||
skip_context_check=skip_context_check,
|
||||
)
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
self.is_current_batch_ephemeral = use_ephemeral
|
||||
response_data = response.json()
|
||||
self.trace_batch_id = (
|
||||
response_data["trace_id"]
|
||||
@@ -165,11 +210,13 @@ class TraceBatchManager:
|
||||
logger.warning(
|
||||
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
|
||||
)
|
||||
self.trace_batch_id = None
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Error initializing trace batch: {e}. Continuing without tracing."
|
||||
)
|
||||
self.trace_batch_id = None
|
||||
|
||||
def begin_event_processing(self) -> None:
|
||||
"""Mark that an event handler started processing (for synchronization)."""
|
||||
|
||||
@@ -7,6 +7,7 @@ from crewai.events.listeners.tracing.first_time_trace_handler import (
|
||||
FirstTimeTraceHandler,
|
||||
)
|
||||
from crewai.events.listeners.tracing.trace_batch_manager import (
|
||||
TraceBatch,
|
||||
TraceBatchManager,
|
||||
)
|
||||
from crewai.events.listeners.tracing.trace_listener import (
|
||||
@@ -657,6 +658,16 @@ class TestTraceListenerSetup:
|
||||
|
||||
trace_listener.first_time_handler.collected_events = True
|
||||
|
||||
mock_batch_response = MagicMock()
|
||||
mock_batch_response.status_code = 201
|
||||
mock_batch_response.json.return_value = {
|
||||
"trace_id": "mock-trace-id",
|
||||
"ephemeral_trace_id": "mock-ephemeral-trace-id",
|
||||
"access_code": "TRACE-mock",
|
||||
}
|
||||
mock_events_response = MagicMock()
|
||||
mock_events_response.status_code = 200
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
trace_listener.first_time_handler,
|
||||
@@ -666,6 +677,40 @@ class TestTraceListenerSetup:
|
||||
patch.object(
|
||||
trace_listener.first_time_handler, "_display_ephemeral_trace_link"
|
||||
) as mock_display_link,
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"initialize_trace_batch",
|
||||
return_value=mock_batch_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=mock_batch_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"send_trace_events",
|
||||
return_value=mock_events_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"send_ephemeral_trace_events",
|
||||
return_value=mock_events_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"finalize_trace_batch",
|
||||
return_value=mock_events_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"finalize_ephemeral_trace_batch",
|
||||
return_value=mock_events_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager,
|
||||
"_cleanup_batch_data",
|
||||
),
|
||||
):
|
||||
crew.kickoff()
|
||||
wait_for_event_handlers()
|
||||
@@ -918,3 +963,576 @@ class TestTraceListenerSetup:
|
||||
mock_init.assert_called_once()
|
||||
payload = mock_init.call_args[0][0]
|
||||
assert "user_identifier" not in payload
|
||||
|
||||
|
||||
class TestTraceBatchIdClearedOnFailure:
|
||||
"""Tests: trace_batch_id is cleared when _initialize_backend_batch fails."""
|
||||
|
||||
def _make_batch_manager(self):
|
||||
"""Create a TraceBatchManager with a pre-set trace_batch_id (simulating first-time user)."""
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id # simulate line 96
|
||||
bm.is_current_batch_ephemeral = True
|
||||
return bm
|
||||
|
||||
def test_trace_batch_id_cleared_on_exception(self):
|
||||
"""trace_batch_id must be None when the API call raises an exception."""
|
||||
bm = self._make_batch_manager()
|
||||
assert bm.trace_batch_id is not None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
side_effect=ConnectionError("network down"),
|
||||
),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
|
||||
def test_trace_batch_id_set_on_success(self):
|
||||
"""trace_batch_id must be set from the server response on success."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "server-ephemeral-trace-id-999"
|
||||
|
||||
mock_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=mock_response,
|
||||
),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
|
||||
def test_send_events_skipped_when_trace_batch_id_none(self):
|
||||
"""_send_events_to_backend must return early when trace_batch_id is None."""
|
||||
bm = self._make_batch_manager()
|
||||
bm.trace_batch_id = None
|
||||
bm.event_buffer = [MagicMock()] # has events
|
||||
|
||||
with patch.object(
|
||||
bm.plus_api, "send_ephemeral_trace_events"
|
||||
) as mock_send:
|
||||
result = bm._send_events_to_backend()
|
||||
|
||||
assert result == 500
|
||||
mock_send.assert_not_called()
|
||||
|
||||
|
||||
class TestInitializeBackendBatchRetry:
|
||||
"""Tests for retry logic in _initialize_backend_batch."""
|
||||
|
||||
def _make_batch_manager(self):
|
||||
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id
|
||||
bm.is_current_batch_ephemeral = True
|
||||
return bm
|
||||
|
||||
def test_retries_on_none_response_then_succeeds(self):
|
||||
"""Retries when API returns None, succeeds on second attempt."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "server-id-after-retry"
|
||||
|
||||
success_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
side_effect=[None, success_response],
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert mock_init.call_count == 2
|
||||
mock_sleep.assert_called_once_with(0.2)
|
||||
|
||||
def test_retries_on_5xx_then_succeeds(self):
|
||||
"""Retries on 500 server error, succeeds on second attempt."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "server-id-after-5xx"
|
||||
|
||||
error_response = MagicMock(status_code=500, text="Internal Server Error")
|
||||
success_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
side_effect=[error_response, success_response],
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert mock_init.call_count == 2
|
||||
|
||||
def test_retries_on_exception_then_succeeds(self):
|
||||
"""Retries on ConnectionError, succeeds on second attempt."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "server-id-after-exception"
|
||||
|
||||
success_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
side_effect=[ConnectionError("network down"), success_response],
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert mock_init.call_count == 2
|
||||
|
||||
def test_no_retry_on_4xx(self):
|
||||
"""Does NOT retry on 422 — client error is not transient."""
|
||||
bm = self._make_batch_manager()
|
||||
|
||||
error_response = MagicMock(status_code=422, text="Unprocessable Entity")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=error_response,
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
assert mock_init.call_count == 1
|
||||
mock_sleep.assert_not_called()
|
||||
|
||||
def test_exhausts_retries_then_clears_batch_id(self):
|
||||
"""After all retries fail, trace_batch_id is None."""
|
||||
bm = self._make_batch_manager()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=None,
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
assert mock_init.call_count == 3 # initial + 2 retries
|
||||
|
||||
|
||||
class TestFirstTimeHandlerBackendInitGuard:
|
||||
"""Tests: backend_initialized gated on actual batch creation success."""
|
||||
|
||||
def _make_handler_with_manager(self):
|
||||
"""Create a FirstTimeTraceHandler wired to a TraceBatchManager."""
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id
|
||||
bm.is_current_batch_ephemeral = True
|
||||
|
||||
handler = FirstTimeTraceHandler()
|
||||
handler.is_first_time = True
|
||||
handler.collected_events = True
|
||||
handler.batch_manager = bm
|
||||
return handler, bm
|
||||
|
||||
def test_backend_initialized_true_on_success(self):
|
||||
"""Events are sent when batch creation succeeds, then state is cleaned up."""
|
||||
handler, bm = self._make_handler_with_manager()
|
||||
server_id = "server-id-abc"
|
||||
|
||||
mock_init_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
mock_send_response = MagicMock(status_code=200)
|
||||
|
||||
trace_batch_id_during_send = None
|
||||
|
||||
def capture_send(*args, **kwargs):
|
||||
nonlocal trace_batch_id_during_send
|
||||
trace_batch_id_during_send = bm.trace_batch_id
|
||||
return mock_send_response
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=mock_init_response,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"send_ephemeral_trace_events",
|
||||
side_effect=capture_send,
|
||||
),
|
||||
patch.object(bm, "_finalize_backend_batch"),
|
||||
):
|
||||
bm.event_buffer = [MagicMock(to_dict=MagicMock(return_value={}))]
|
||||
handler._initialize_backend_and_send_events()
|
||||
|
||||
# trace_batch_id was set correctly during send
|
||||
assert trace_batch_id_during_send == server_id
|
||||
# State cleaned up after completion (singleton reuse)
|
||||
assert bm.backend_initialized is False
|
||||
assert bm.trace_batch_id is None
|
||||
assert bm.current_batch is None
|
||||
|
||||
def test_backend_initialized_false_on_failure(self):
|
||||
"""backend_initialized stays False and events are NOT sent when batch creation fails."""
|
||||
handler, bm = self._make_handler_with_manager()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=None, # server call fails
|
||||
),
|
||||
patch.object(bm, "_send_events_to_backend") as mock_send,
|
||||
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
|
||||
patch.object(handler, "_gracefully_fail") as mock_fail,
|
||||
):
|
||||
bm.event_buffer = [MagicMock()]
|
||||
handler._initialize_backend_and_send_events()
|
||||
|
||||
assert bm.backend_initialized is False
|
||||
assert bm.trace_batch_id is None
|
||||
mock_send.assert_not_called()
|
||||
mock_finalize.assert_not_called()
|
||||
mock_fail.assert_called_once()
|
||||
|
||||
def test_backend_initialized_false_on_non_2xx(self):
|
||||
"""backend_initialized stays False when server returns non-2xx."""
|
||||
handler, bm = self._make_handler_with_manager()
|
||||
|
||||
mock_response = MagicMock(status_code=500, text="Internal Server Error")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=mock_response,
|
||||
),
|
||||
patch.object(bm, "_send_events_to_backend") as mock_send,
|
||||
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
|
||||
patch.object(handler, "_gracefully_fail") as mock_fail,
|
||||
):
|
||||
bm.event_buffer = [MagicMock()]
|
||||
handler._initialize_backend_and_send_events()
|
||||
|
||||
assert bm.backend_initialized is False
|
||||
assert bm.trace_batch_id is None
|
||||
mock_send.assert_not_called()
|
||||
mock_finalize.assert_not_called()
|
||||
mock_fail.assert_called_once()
|
||||
|
||||
|
||||
class TestFirstTimeHandlerAlwaysEphemeral:
|
||||
"""Tests that first-time handler always uses ephemeral with skip_context_check."""
|
||||
|
||||
def _make_handler_with_manager(self):
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id
|
||||
bm.is_current_batch_ephemeral = True
|
||||
|
||||
handler = FirstTimeTraceHandler()
|
||||
handler.is_first_time = True
|
||||
handler.collected_events = True
|
||||
handler.batch_manager = bm
|
||||
return handler, bm
|
||||
|
||||
def test_deferred_init_uses_ephemeral_and_skip_context_check(self):
|
||||
"""Deferred backend init always uses ephemeral=True and skip_context_check=True."""
|
||||
handler, bm = self._make_handler_with_manager()
|
||||
|
||||
with (
|
||||
patch.object(bm, "_initialize_backend_batch") as mock_init,
|
||||
patch.object(bm, "_send_events_to_backend"),
|
||||
patch.object(bm, "_finalize_backend_batch"),
|
||||
):
|
||||
mock_init.side_effect = lambda **kwargs: None
|
||||
bm.event_buffer = [MagicMock()]
|
||||
handler._initialize_backend_and_send_events()
|
||||
|
||||
mock_init.assert_called_once()
|
||||
assert mock_init.call_args.kwargs["use_ephemeral"] is True
|
||||
assert mock_init.call_args.kwargs["skip_context_check"] is True
|
||||
|
||||
|
||||
class TestAuthFailbackToEphemeral:
|
||||
"""Tests for ephemeral fallback when server rejects auth (401/403)."""
|
||||
|
||||
def _make_batch_manager(self):
|
||||
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id
|
||||
bm.is_current_batch_ephemeral = False # authenticated path
|
||||
return bm
|
||||
|
||||
def test_401_non_ephemeral_falls_back_to_ephemeral(self):
|
||||
"""A 401 on the non-ephemeral endpoint should retry as ephemeral."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "ephemeral-fallback-id"
|
||||
|
||||
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
||||
ephemeral_success = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_trace_batch",
|
||||
return_value=auth_rejected,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=ephemeral_success,
|
||||
) as mock_ephemeral,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=False,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert bm.is_current_batch_ephemeral is True
|
||||
mock_ephemeral.assert_called_once()
|
||||
|
||||
def test_403_non_ephemeral_falls_back_to_ephemeral(self):
|
||||
"""A 403 on the non-ephemeral endpoint should also fall back."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "ephemeral-fallback-403"
|
||||
|
||||
forbidden = MagicMock(status_code=403, text="Forbidden")
|
||||
ephemeral_success = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_trace_batch",
|
||||
return_value=forbidden,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=ephemeral_success,
|
||||
),
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=False,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert bm.is_current_batch_ephemeral is True
|
||||
|
||||
def test_401_on_ephemeral_does_not_recurse(self):
|
||||
"""A 401 on the ephemeral endpoint should NOT try to fall back again."""
|
||||
bm = self._make_batch_manager()
|
||||
bm.is_current_batch_ephemeral = True
|
||||
|
||||
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=auth_rejected,
|
||||
) as mock_ephemeral,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
# Called only once — no recursive fallback
|
||||
mock_ephemeral.assert_called()
|
||||
|
||||
def test_401_fallback_ephemeral_also_fails(self):
|
||||
"""If ephemeral fallback also fails, trace_batch_id is cleared."""
|
||||
bm = self._make_batch_manager()
|
||||
|
||||
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
||||
ephemeral_fail = MagicMock(status_code=422, text="Validation failed")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_trace_batch",
|
||||
return_value=auth_rejected,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=ephemeral_fail,
|
||||
),
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=False,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
|
||||
Reference in New Issue
Block a user