import os from unittest.mock import MagicMock, Mock, patch import pytest from crewai import Agent, Crew, Task from crewai.events.listeners.tracing.first_time_trace_handler import ( FirstTimeTraceHandler, ) from crewai.events.listeners.tracing.trace_batch_manager import ( TraceBatch, TraceBatchManager, ) from crewai.events.listeners.tracing.trace_listener import ( TraceCollectionListener, ) from crewai.events.listeners.tracing.types import TraceEvent from crewai.flow.flow import Flow, start from tests.utils import wait_for_event_handlers class TestTraceListenerSetup: """Test TraceListener is properly setup and collecting events""" @pytest.fixture(autouse=True) def mock_user_data_file_io(self): """Mock user data file I/O to prevent file system pollution between tests""" with patch( "crewai.events.listeners.tracing.utils._load_user_data", return_value={}, ): yield @pytest.fixture(autouse=True) def mock_auth_token(self): """Mock authentication token for all tests in this class""" # Need to patch all the places where get_auth_token is imported/used with ( patch( "crewai.cli.authentication.token.get_auth_token", return_value="mock_token_12345", ), patch( "crewai.events.listeners.tracing.trace_listener.get_auth_token", return_value="mock_token_12345", ), patch( "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", return_value="mock_token_12345", ), ): yield @pytest.fixture(autouse=True) def reset_tracing_singletons(self): """Reset tracing singleton instances between tests""" from crewai.events.event_bus import crewai_event_bus from crewai.events.event_listener import EventListener from crewai.events.listeners.tracing.utils import _tracing_enabled # Reset the tracing enabled contextvar try: _tracing_enabled.set(None) except (LookupError, AttributeError): pass # Clear event bus handlers BEFORE creating any new singletons with crewai_event_bus._rwlock.w_locked(): crewai_event_bus._sync_handlers = {} crewai_event_bus._async_handlers = {} crewai_event_bus._handler_dependencies = {} crewai_event_bus._execution_plan_cache = {} # Reset TraceCollectionListener singleton - must reset instance attributes too if TraceCollectionListener._instance is not None: # Reset instance attributes that shadow class attributes (only if they exist as instance attrs) instance_dict = TraceCollectionListener._instance.__dict__ if "_initialized" in instance_dict: del TraceCollectionListener._instance._initialized if "_listeners_setup" in instance_dict: del TraceCollectionListener._instance._listeners_setup # Reset class attributes TraceCollectionListener._instance = None TraceCollectionListener._initialized = False TraceCollectionListener._listeners_setup = False # Reset EventListener singleton if hasattr(EventListener, "_instance"): EventListener._instance = None yield # Clean up after test with crewai_event_bus._rwlock.w_locked(): crewai_event_bus._sync_handlers = {} crewai_event_bus._async_handlers = {} crewai_event_bus._handler_dependencies = {} crewai_event_bus._execution_plan_cache = {} # Reset TraceCollectionListener singleton - must reset instance attributes too if TraceCollectionListener._instance is not None: # Reset instance attributes that shadow class attributes (only if they exist as instance attrs) instance_dict = TraceCollectionListener._instance.__dict__ if "_initialized" in instance_dict: del TraceCollectionListener._instance._initialized if "_listeners_setup" in instance_dict: del TraceCollectionListener._instance._listeners_setup # Reset class attributes TraceCollectionListener._instance = None TraceCollectionListener._initialized = False TraceCollectionListener._listeners_setup = False if hasattr(EventListener, "_instance"): EventListener._instance = None @pytest.fixture(autouse=True) def mock_plus_api_calls(self): """Mock all PlusAPI HTTP calls to avoid network requests""" with ( patch("requests.post") as mock_post, patch("requests.get") as mock_get, patch("requests.put") as mock_put, patch("requests.delete") as mock_delete, patch.object(TraceBatchManager, "_cleanup_batch_data", return_value=True), ): mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { "id": "mock_trace_batch_id", "status": "success", "message": "Batch created successfully", } mock_response.raise_for_status.return_value = None mock_post.return_value = mock_response mock_get.return_value = mock_response mock_put.return_value = mock_response mock_delete.return_value = mock_response mock_mark_failed = MagicMock() mock_mark_failed.return_value = mock_response yield { "post": mock_post, "get": mock_get, "put": mock_put, "delete": mock_delete, "mark_trace_batch_as_failed": mock_mark_failed, } @pytest.mark.vcr() def test_trace_listener_collects_crew_events(self): """Test that trace listener properly collects events from crew execution""" with patch.dict( os.environ, { "CREWAI_TRACING_ENABLED": "true", "CREWAI_DISABLE_TELEMETRY": "false", "CREWAI_DISABLE_TRACKING": "false", "OTEL_SDK_DISABLED": "false", }, ): agent = Agent( role="Test Agent", goal="Test goal", backstory="Test backstory", llm="gpt-4o-mini", ) task = Task( description="Say hello to the world", expected_output="hello world", agent=agent, ) crew = Crew(agents=[agent], tasks=[task], verbose=True) from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener trace_listener = TraceCollectionListener() crew.kickoff() initialized = trace_listener.batch_manager.wait_for_batch_initialization(timeout=5.0) assert initialized, "Batch should have been initialized" assert trace_listener.batch_manager.is_batch_initialized() assert trace_listener.batch_manager.current_batch is not None @pytest.mark.vcr() def test_batch_manager_finalizes_batch_clears_buffer(self): """Test that batch manager properly finalizes batch and clears buffer""" with patch.dict( os.environ, { "CREWAI_TRACING_ENABLED": "true", "CREWAI_DISABLE_TELEMETRY": "false", "CREWAI_DISABLE_TRACKING": "false", "OTEL_SDK_DISABLED": "false", }, ): agent = Agent( role="Test Agent", goal="Test goal", backstory="Test backstory", llm="gpt-4o-mini", ) task = Task( description="Say hello to the world", expected_output="hello world", agent=agent, ) crew = Crew(agents=[agent], tasks=[task], verbose=True) from crewai.events.event_bus import crewai_event_bus trace_listener = None with crewai_event_bus._rwlock.r_locked(): for handler_set in crewai_event_bus._sync_handlers.values(): for handler in handler_set: if hasattr(handler, "__self__") and isinstance( handler.__self__, TraceCollectionListener ): trace_listener = handler.__self__ break if trace_listener: break if not trace_listener: for handler_set in crewai_event_bus._async_handlers.values(): for handler in handler_set: if hasattr(handler, "__self__") and isinstance( handler.__self__, TraceCollectionListener ): trace_listener = handler.__self__ break if trace_listener: break if not trace_listener: pytest.skip( "No trace listener found - tracing may not be properly enabled" ) with patch.object( trace_listener.batch_manager, "finalize_batch", wraps=trace_listener.batch_manager.finalize_batch, ) as finalize_mock: crew.kickoff() assert finalize_mock.call_count >= 1 @pytest.mark.vcr() def test_events_collection_batch_manager(self, mock_plus_api_calls): """Test that trace listener properly collects events from crew execution""" with patch.dict( os.environ, { "CREWAI_TRACING_ENABLED": "true", "CREWAI_DISABLE_TELEMETRY": "false", "CREWAI_DISABLE_TRACKING": "false", "OTEL_SDK_DISABLED": "false", }, ): agent = Agent( role="Test Agent", goal="Test goal", backstory="Test backstory", llm="gpt-4o-mini", ) task = Task( description="Say hello to the world", expected_output="hello world", agent=agent, ) crew = Crew(agents=[agent], tasks=[task], verbose=True) from crewai.events.event_bus import crewai_event_bus # Create and setup trace listener explicitly trace_listener = TraceCollectionListener() trace_listener.setup_listeners(crewai_event_bus) with patch.object( trace_listener.batch_manager, "add_event", wraps=trace_listener.batch_manager.add_event, ) as add_event_mock: crew.kickoff() wait_for_event_handlers() assert add_event_mock.call_count >= 2 completion_events = [ call.args[0] for call in add_event_mock.call_args_list if call.args[0].type == "crew_kickoff_completed" ] assert len(completion_events) >= 1 # Verify the first completion event has proper structure completion_event = completion_events[0] assert "crew_name" in completion_event.event_data assert completion_event.event_data["crew_name"] == "crew" # Verify all events have proper structure for call in add_event_mock.call_args_list: event = call.args[0] assert isinstance(event, TraceEvent) assert hasattr(event, "event_data") assert hasattr(event, "type") @pytest.mark.vcr() def test_trace_listener_disabled_when_env_false(self): """Test that trace listener doesn't make HTTP calls when tracing is disabled""" with patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "false"}): agent = Agent( role="Test Agent", goal="Test goal", backstory="Test backstory", llm="gpt-4o-mini", ) task = Task( description="Say hello to the world", expected_output="hello world", agent=agent, ) crew = Crew(agents=[agent], tasks=[task], verbose=True) result = crew.kickoff() assert result is not None from crewai.events.event_bus import crewai_event_bus trace_handlers = [] with crewai_event_bus._rwlock.r_locked(): for handlers in crewai_event_bus._sync_handlers.values(): for handler in handlers: if hasattr(handler, "__self__") and isinstance( handler.__self__, TraceCollectionListener ): trace_handlers.append(handler) for handlers in crewai_event_bus._async_handlers.values(): for handler in handlers: if hasattr(handler, "__self__") and isinstance( handler.__self__, TraceCollectionListener ): trace_handlers.append(handler) assert len(trace_handlers) == 0, ( f"Found {len(trace_handlers)} TraceCollectionListener handlers when tracing should be disabled" ) def test_trace_listener_setup_correctly_for_crew(self): """Test that trace listener is set up correctly when enabled""" with patch.dict( os.environ, { "CREWAI_TRACING_ENABLED": "true", "CREWAI_DISABLE_TELEMETRY": "false", "CREWAI_DISABLE_TRACKING": "false", "OTEL_SDK_DISABLED": "false", }, ): agent = Agent( role="Test Agent", goal="Test goal", backstory="Test backstory", llm="gpt-4o-mini", ) task = Task( description="Say hello to the world", expected_output="hello world", agent=agent, ) with patch.object( TraceCollectionListener, "setup_listeners" ) as mock_listener_setup: Crew(agents=[agent], tasks=[task], verbose=True) assert mock_listener_setup.call_count >= 1 @pytest.mark.vcr() def test_trace_listener_setup_correctly_for_flow(self): """Test that trace listener is set up correctly when enabled""" with patch.dict( os.environ, { "CREWAI_TRACING_ENABLED": "true", "CREWAI_DISABLE_TELEMETRY": "false", "CREWAI_DISABLE_TRACKING": "false", "OTEL_SDK_DISABLED": "false", }, ): class FlowExample(Flow): @start() def start(self): pass with patch.object( TraceCollectionListener, "setup_listeners" ) as mock_listener_setup: FlowExample() assert mock_listener_setup.call_count >= 1 @pytest.mark.vcr() def test_trace_listener_ephemeral_batch(self): """Test that trace listener properly handles ephemeral batches""" with ( patch.dict( os.environ, { "CREWAI_TRACING_ENABLED": "true", "CREWAI_DISABLE_TELEMETRY": "false", "CREWAI_DISABLE_TRACKING": "false", "OTEL_SDK_DISABLED": "false", }, ), patch( "crewai.events.listeners.tracing.trace_listener.TraceCollectionListener._check_authenticated", return_value=False, ), ): agent = Agent( role="Test Agent", goal="Test goal", backstory="Test backstory", llm="gpt-4o-mini", ) task = Task( description="Say hello to the world", expected_output="hello world", agent=agent, ) crew = Crew(agents=[agent], tasks=[task], tracing=True) from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener trace_listener = TraceCollectionListener() crew.kickoff() initialized = trace_listener.batch_manager.wait_for_batch_initialization(timeout=5.0) assert initialized, ( "Batch should have been initialized for unauthenticated user" ) wait_for_event_handlers() @pytest.mark.vcr() def test_trace_listener_with_authenticated_user(self): """Test that trace listener properly handles authenticated batches""" with patch.dict( os.environ, { "CREWAI_TRACING_ENABLED": "true", "CREWAI_DISABLE_TELEMETRY": "false", "CREWAI_DISABLE_TRACKING": "false", "OTEL_SDK_DISABLED": "false", }, ): agent = Agent( role="Test Agent", goal="Test goal", backstory="Test backstory", llm="gpt-4o-mini", ) task = Task( description="Say hello to the world", expected_output="hello world", agent=agent, ) from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener trace_listener = TraceCollectionListener() crew = Crew(agents=[agent], tasks=[task], tracing=True) crew.kickoff() initialized = trace_listener.batch_manager.wait_for_batch_initialization(timeout=5.0) assert initialized, ( "Batch should have been initialized for authenticated user" ) wait_for_event_handlers() # Helper method to ensure cleanup def teardown_method(self): """Cleanup after each test method""" from crewai.events.event_bus import crewai_event_bus from crewai.events.event_listener import EventListener with crewai_event_bus._rwlock.w_locked(): crewai_event_bus._sync_handlers = {} crewai_event_bus._async_handlers = {} crewai_event_bus._handler_dependencies = {} crewai_event_bus._execution_plan_cache = {} # Reset EventListener singleton if hasattr(EventListener, "_instance"): EventListener._instance = None @classmethod def teardown_class(cls): """Final cleanup after all tests in this class""" from crewai.events.event_bus import crewai_event_bus from crewai.events.event_listener import EventListener with crewai_event_bus._rwlock.w_locked(): crewai_event_bus._sync_handlers = {} crewai_event_bus._async_handlers = {} crewai_event_bus._handler_dependencies = {} crewai_event_bus._execution_plan_cache = {} # Reset EventListener singleton if hasattr(EventListener, "_instance"): EventListener._instance = None @pytest.mark.vcr() def test_first_time_user_trace_collection_with_timeout(self, mock_plus_api_calls): """Test first-time user trace collection logic with timeout behavior""" with ( patch.dict( os.environ, { "CREWAI_TRACING_ENABLED": "false", "CREWAI_DISABLE_TELEMETRY": "false", "CREWAI_DISABLE_TRACKING": "false", "OTEL_SDK_DISABLED": "false", }, ), patch( "crewai.events.listeners.tracing.utils._is_test_environment", return_value=False, ), patch( "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces", return_value=True, ), patch( "crewai.events.listeners.tracing.utils.is_first_execution", return_value=True, ), patch( "crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing", return_value=False, ) as mock_prompt, patch( "crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed" ) as mock_mark_completed, ): agent = Agent( role="Test Agent", goal="Test goal", backstory="Test backstory", llm="gpt-4o-mini", ) task = Task( description="Say hello to the world", expected_output="hello world", agent=agent, ) crew = Crew(agents=[agent], tasks=[task], verbose=True) from crewai.events.event_bus import crewai_event_bus trace_listener = TraceCollectionListener() trace_listener.setup_listeners(crewai_event_bus) trace_listener.first_time_handler = FirstTimeTraceHandler() if trace_listener.first_time_handler.initialize_for_first_time_user(): trace_listener.first_time_handler.set_batch_manager(trace_listener.batch_manager) assert trace_listener.first_time_handler.is_first_time is True assert trace_listener.first_time_handler.collected_events is False trace_listener.batch_manager.batch_owner_type = "crew" result = crew.kickoff() wait_for_event_handlers() assert result is not None assert trace_listener.first_time_handler.collected_events is True, ( "Events should have been collected" ) mock_prompt.assert_called_once() mock_mark_completed.assert_called_once() @pytest.mark.vcr() def test_first_time_user_trace_collection_user_accepts(self, mock_plus_api_calls): """Test first-time user trace collection when user accepts viewing traces""" with ( patch.dict( os.environ, { "CREWAI_TRACING_ENABLED": "false", "CREWAI_DISABLE_TELEMETRY": "false", "CREWAI_DISABLE_TRACKING": "false", "OTEL_SDK_DISABLED": "false", }, ), patch( "crewai.events.listeners.tracing.utils._is_test_environment", return_value=False, ), patch( "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces", return_value=True, ), patch( "crewai.events.listeners.tracing.utils.is_first_execution", return_value=True, ), patch( "crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing", return_value=True, ), patch( "crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed" ) as mock_mark_completed, ): agent = Agent( role="Test Agent", goal="Test goal", backstory="Test backstory", llm="gpt-4o-mini", ) task = Task( description="Say hello to the world", expected_output="hello world", agent=agent, ) crew = Crew(agents=[agent], tasks=[task], verbose=True) from crewai.events.event_bus import crewai_event_bus trace_listener = TraceCollectionListener() trace_listener.setup_listeners(crewai_event_bus) # Re-initialize first-time handler after patches are applied to ensure clean state trace_listener.first_time_handler = FirstTimeTraceHandler() if trace_listener.first_time_handler.initialize_for_first_time_user(): trace_listener.first_time_handler.set_batch_manager(trace_listener.batch_manager) trace_listener.batch_manager.ephemeral_trace_url = ( "https://crewai.com/trace/mock-id" ) assert trace_listener.first_time_handler.is_first_time is True trace_listener.first_time_handler.collected_events = True mock_batch_response = MagicMock() mock_batch_response.status_code = 201 mock_batch_response.json.return_value = { "trace_id": "mock-trace-id", "ephemeral_trace_id": "mock-ephemeral-trace-id", "access_code": "TRACE-mock", } mock_events_response = MagicMock() mock_events_response.status_code = 200 with ( patch.object( trace_listener.first_time_handler, "_initialize_backend_and_send_events", wraps=trace_listener.first_time_handler._initialize_backend_and_send_events, ) as mock_init_backend, patch.object( trace_listener.first_time_handler, "_display_ephemeral_trace_link" ) as mock_display_link, patch.object( trace_listener.batch_manager.plus_api, "initialize_trace_batch", return_value=mock_batch_response, ), patch.object( trace_listener.batch_manager.plus_api, "initialize_ephemeral_trace_batch", return_value=mock_batch_response, ), patch.object( trace_listener.batch_manager.plus_api, "send_trace_events", return_value=mock_events_response, ), patch.object( trace_listener.batch_manager.plus_api, "send_ephemeral_trace_events", return_value=mock_events_response, ), patch.object( trace_listener.batch_manager.plus_api, "finalize_trace_batch", return_value=mock_events_response, ), patch.object( trace_listener.batch_manager.plus_api, "finalize_ephemeral_trace_batch", return_value=mock_events_response, ), patch.object( trace_listener.batch_manager, "_cleanup_batch_data", ), ): crew.kickoff() wait_for_event_handlers() mock_init_backend.assert_called_once() mock_display_link.assert_called_once() mock_mark_completed.assert_called_once() @pytest.mark.vcr() def test_first_time_user_trace_consolidation_logic(self, mock_plus_api_calls): """Test the consolidation logic for first-time users vs regular tracing""" with ( patch.dict( os.environ, { "CREWAI_TRACING_ENABLED": "", "CREWAI_DISABLE_TELEMETRY": "false", "CREWAI_DISABLE_TRACKING": "false", "OTEL_SDK_DISABLED": "false", }, ), patch( "crewai.events.listeners.tracing.utils._is_test_environment", return_value=False, ), patch( "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces", return_value=True, ), patch( "crewai.events.listeners.tracing.utils.is_first_execution", return_value=True, ), ): from crewai.events.event_bus import crewai_event_bus with crewai_event_bus._rwlock.w_locked(): crewai_event_bus._sync_handlers = {} crewai_event_bus._async_handlers = {} trace_listener = TraceCollectionListener() # Re-initialize first-time handler after patches are applied to ensure clean state # This is necessary because the singleton may have been created before patches were active trace_listener.first_time_handler = FirstTimeTraceHandler() if trace_listener.first_time_handler.initialize_for_first_time_user(): trace_listener.first_time_handler.set_batch_manager(trace_listener.batch_manager) trace_listener.setup_listeners(crewai_event_bus) assert trace_listener.first_time_handler.is_first_time is True agent = Agent( role="Test Agent", goal="Test goal", backstory="Test backstory", llm="gpt-4o-mini", ) task = Task( description="Test task", expected_output="test output", agent=agent ) crew = Crew(agents=[agent], tasks=[task]) result = crew.kickoff() wait_for_event_handlers() assert trace_listener.batch_manager.is_batch_initialized(), ( "Batch should have been initialized for first-time user" ) assert result is not None def test_first_time_handler_timeout_behavior(self): """Test the timeout behavior of the first-time trace prompt""" with ( patch( "crewai.events.listeners.tracing.utils._is_test_environment", return_value=False, ), patch( "crewai.events.listeners.tracing.utils._is_interactive_terminal", return_value=True, ), patch("threading.Thread") as mock_thread, ): from crewai.events.listeners.tracing.utils import ( prompt_user_for_trace_viewing, ) mock_thread_instance = Mock() mock_thread_instance.is_alive.return_value = True mock_thread.return_value = mock_thread_instance result = prompt_user_for_trace_viewing(timeout_seconds=5) assert result is False mock_thread.assert_called_once() call_args = mock_thread.call_args assert call_args[1]["daemon"] is True mock_thread_instance.start.assert_called_once() mock_thread_instance.join.assert_called_once_with(timeout=5) mock_thread_instance.is_alive.assert_called_once() def test_first_time_handler_graceful_error_handling(self): """Test graceful error handling in first-time trace logic""" with ( patch( "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces", return_value=True, ), patch( "crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing", side_effect=Exception("Prompt failed"), ), patch( "crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed" ) as mock_mark_completed, ): handler = FirstTimeTraceHandler() handler.is_first_time = True handler.collected_events = True handler.handle_execution_completion() mock_mark_completed.assert_called_once() def test_trace_batch_marked_as_failed_on_finalize_error(self): """Test that trace batch is marked as failed when finalization returns non-200 status""" # Test the error handling logic directly in TraceBatchManager with patch("crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True): batch_manager = TraceBatchManager() # Initialize a batch batch_manager.current_batch = batch_manager.initialize_batch( user_context={"privacy_level": "standard"}, execution_metadata={ "execution_type": "crew", "crew_name": "test_crew", }, ) batch_manager.trace_batch_id = "test_batch_id_12345" batch_manager.backend_initialized = True # Mock the API responses with ( patch.object( batch_manager.plus_api, "send_trace_events", return_value=MagicMock(status_code=200), ), patch.object( batch_manager.plus_api, "finalize_trace_batch", return_value=MagicMock(status_code=500, text="Internal Server Error"), ), patch.object( batch_manager.plus_api, "mark_trace_batch_as_failed", ) as mock_mark_failed, ): # Call finalize_batch directly batch_manager.finalize_batch() # Verify that mark_trace_batch_as_failed was called with the error message mock_mark_failed.assert_called_once_with( "test_batch_id_12345", "Internal Server Error" ) def test_ephemeral_batch_includes_anon_id(self): """Test that ephemeral batch initialization sends anon_id from get_user_id()""" fake_user_id = "abc123def456" with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch( "crewai.events.listeners.tracing.trace_batch_manager.get_user_id", return_value=fake_user_id, ), patch( "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces", return_value=False, ), ): batch_manager = TraceBatchManager() mock_response = MagicMock( status_code=201, json=MagicMock(return_value={ "ephemeral_trace_id": "test-trace-id", "access_code": "TRACE-abc123", }), ) with patch.object( batch_manager.plus_api, "initialize_ephemeral_trace_batch", return_value=mock_response, ) as mock_init: batch_manager.initialize_batch( user_context={"privacy_level": "standard"}, execution_metadata={ "execution_type": "crew", "crew_name": "test_crew", }, use_ephemeral=True, ) mock_init.assert_called_once() payload = mock_init.call_args[0][0] assert payload["user_identifier"] == fake_user_id assert "ephemeral_trace_id" in payload def test_non_ephemeral_batch_does_not_include_anon_id(self): """Test that non-ephemeral batch initialization does not send anon_id""" with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch( "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces", return_value=False, ), ): batch_manager = TraceBatchManager() mock_response = MagicMock( status_code=201, json=MagicMock(return_value={"trace_id": "test-trace-id"}), ) with patch.object( batch_manager.plus_api, "initialize_trace_batch", return_value=mock_response, ) as mock_init: batch_manager.initialize_batch( user_context={"privacy_level": "standard"}, execution_metadata={ "execution_type": "crew", "crew_name": "test_crew", }, use_ephemeral=False, ) mock_init.assert_called_once() payload = mock_init.call_args[0][0] assert "user_identifier" not in payload class TestTraceBatchIdClearedOnFailure: """Tests: trace_batch_id is cleared when _initialize_backend_batch fails.""" def _make_batch_manager(self): """Create a TraceBatchManager with a pre-set trace_batch_id (simulating first-time user).""" with patch( "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", return_value="mock_token", ): bm = TraceBatchManager() bm.current_batch = TraceBatch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew", "crew_name": "test"}, ) bm.trace_batch_id = bm.current_batch.batch_id # simulate line 96 bm.is_current_batch_ephemeral = True return bm def test_trace_batch_id_cleared_on_exception(self): """trace_batch_id must be None when the API call raises an exception.""" bm = self._make_batch_manager() assert bm.trace_batch_id is not None with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", side_effect=ConnectionError("network down"), ), ): bm._initialize_backend_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, use_ephemeral=True, ) assert bm.trace_batch_id is None def test_trace_batch_id_set_on_success(self): """trace_batch_id must be set from the server response on success.""" bm = self._make_batch_manager() server_id = "server-ephemeral-trace-id-999" mock_response = MagicMock( status_code=201, json=MagicMock(return_value={"ephemeral_trace_id": server_id}), ) with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", return_value=mock_response, ), ): bm._initialize_backend_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, use_ephemeral=True, ) assert bm.trace_batch_id == server_id def test_send_events_skipped_when_trace_batch_id_none(self): """_send_events_to_backend must return early when trace_batch_id is None.""" bm = self._make_batch_manager() bm.trace_batch_id = None bm.event_buffer = [MagicMock()] # has events with patch.object( bm.plus_api, "send_ephemeral_trace_events" ) as mock_send: result = bm._send_events_to_backend() assert result == 500 mock_send.assert_not_called() class TestInitializeBackendBatchRetry: """Tests for retry logic in _initialize_backend_batch.""" def _make_batch_manager(self): """Create a TraceBatchManager with a pre-set trace_batch_id.""" with patch( "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", return_value="mock_token", ): bm = TraceBatchManager() bm.current_batch = TraceBatch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew", "crew_name": "test"}, ) bm.trace_batch_id = bm.current_batch.batch_id bm.is_current_batch_ephemeral = True return bm def test_retries_on_none_response_then_succeeds(self): """Retries when API returns None, succeeds on second attempt.""" bm = self._make_batch_manager() server_id = "server-id-after-retry" success_response = MagicMock( status_code=201, json=MagicMock(return_value={"ephemeral_trace_id": server_id}), ) with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", side_effect=[None, success_response], ) as mock_init, patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep, ): bm._initialize_backend_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, use_ephemeral=True, ) assert bm.trace_batch_id == server_id assert mock_init.call_count == 2 mock_sleep.assert_called_once_with(0.2) def test_retries_on_5xx_then_succeeds(self): """Retries on 500 server error, succeeds on second attempt.""" bm = self._make_batch_manager() server_id = "server-id-after-5xx" error_response = MagicMock(status_code=500, text="Internal Server Error") success_response = MagicMock( status_code=201, json=MagicMock(return_value={"ephemeral_trace_id": server_id}), ) with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", side_effect=[error_response, success_response], ) as mock_init, patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), ): bm._initialize_backend_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, use_ephemeral=True, ) assert bm.trace_batch_id == server_id assert mock_init.call_count == 2 def test_no_retry_on_exception(self): """Exceptions (e.g. timeout, connection error) abort immediately without retry.""" bm = self._make_batch_manager() with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", side_effect=ConnectionError("network down"), ) as mock_init, patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep, ): bm._initialize_backend_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, use_ephemeral=True, ) assert bm.trace_batch_id is None assert mock_init.call_count == 1 mock_sleep.assert_not_called() def test_no_retry_on_4xx(self): """Does NOT retry on 422 — client error is not transient.""" bm = self._make_batch_manager() error_response = MagicMock(status_code=422, text="Unprocessable Entity") with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", return_value=error_response, ) as mock_init, patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep, ): bm._initialize_backend_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, use_ephemeral=True, ) assert bm.trace_batch_id is None assert mock_init.call_count == 1 mock_sleep.assert_not_called() def test_exhausts_retries_then_clears_batch_id(self): """After all retries fail, trace_batch_id is None.""" bm = self._make_batch_manager() with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", return_value=None, ) as mock_init, patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), ): bm._initialize_backend_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, use_ephemeral=True, ) assert bm.trace_batch_id is None assert mock_init.call_count == 2 # initial + 1 retry class TestFirstTimeHandlerBackendInitGuard: """Tests: backend_initialized gated on actual batch creation success.""" def _make_handler_with_manager(self): """Create a FirstTimeTraceHandler wired to a TraceBatchManager.""" with patch( "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", return_value="mock_token", ): bm = TraceBatchManager() bm.current_batch = TraceBatch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew", "crew_name": "test"}, ) bm.trace_batch_id = bm.current_batch.batch_id bm.is_current_batch_ephemeral = True handler = FirstTimeTraceHandler() handler.is_first_time = True handler.collected_events = True handler.batch_manager = bm return handler, bm def test_backend_initialized_true_on_success(self): """Events are sent when batch creation succeeds, then state is cleaned up.""" handler, bm = self._make_handler_with_manager() server_id = "server-id-abc" mock_init_response = MagicMock( status_code=201, json=MagicMock(return_value={"ephemeral_trace_id": server_id}), ) mock_send_response = MagicMock(status_code=200) trace_batch_id_during_send = None def capture_send(*args, **kwargs): nonlocal trace_batch_id_during_send trace_batch_id_during_send = bm.trace_batch_id return mock_send_response with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", return_value=mock_init_response, ), patch.object( bm.plus_api, "send_ephemeral_trace_events", side_effect=capture_send, ), patch.object(bm, "_finalize_backend_batch"), ): bm.event_buffer = [MagicMock(to_dict=MagicMock(return_value={}))] handler._initialize_backend_and_send_events() # trace_batch_id was set correctly during send assert trace_batch_id_during_send == server_id # State cleaned up after completion (singleton reuse) assert bm.backend_initialized is False assert bm.trace_batch_id is None assert bm.current_batch is None def test_backend_initialized_false_on_failure(self): """backend_initialized stays False and events are NOT sent when batch creation fails.""" handler, bm = self._make_handler_with_manager() with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", return_value=None, # server call fails ), patch.object(bm, "_send_events_to_backend") as mock_send, patch.object(bm, "_finalize_backend_batch") as mock_finalize, patch.object(handler, "_gracefully_fail") as mock_fail, ): bm.event_buffer = [MagicMock()] handler._initialize_backend_and_send_events() assert bm.backend_initialized is False assert bm.trace_batch_id is None mock_send.assert_not_called() mock_finalize.assert_not_called() mock_fail.assert_called_once() def test_backend_initialized_false_on_non_2xx(self): """backend_initialized stays False when server returns non-2xx.""" handler, bm = self._make_handler_with_manager() mock_response = MagicMock(status_code=500, text="Internal Server Error") with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", return_value=mock_response, ), patch.object(bm, "_send_events_to_backend") as mock_send, patch.object(bm, "_finalize_backend_batch") as mock_finalize, patch.object(handler, "_gracefully_fail") as mock_fail, ): bm.event_buffer = [MagicMock()] handler._initialize_backend_and_send_events() assert bm.backend_initialized is False assert bm.trace_batch_id is None mock_send.assert_not_called() mock_finalize.assert_not_called() mock_fail.assert_called_once() class TestFirstTimeHandlerAlwaysEphemeral: """Tests that first-time handler always uses ephemeral with skip_context_check.""" def _make_handler_with_manager(self): with patch( "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", return_value="mock_token", ): bm = TraceBatchManager() bm.current_batch = TraceBatch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew", "crew_name": "test"}, ) bm.trace_batch_id = bm.current_batch.batch_id bm.is_current_batch_ephemeral = True handler = FirstTimeTraceHandler() handler.is_first_time = True handler.collected_events = True handler.batch_manager = bm return handler, bm def test_deferred_init_uses_ephemeral_and_skip_context_check(self): """Deferred backend init always uses ephemeral=True and skip_context_check=True.""" handler, bm = self._make_handler_with_manager() with ( patch.object(bm, "_initialize_backend_batch") as mock_init, patch.object(bm, "_send_events_to_backend"), patch.object(bm, "_finalize_backend_batch"), ): mock_init.side_effect = lambda **kwargs: None bm.event_buffer = [MagicMock()] handler._initialize_backend_and_send_events() mock_init.assert_called_once() assert mock_init.call_args.kwargs["use_ephemeral"] is True assert mock_init.call_args.kwargs["skip_context_check"] is True class TestAuthFailbackToEphemeral: """Tests for ephemeral fallback when server rejects auth (401/403).""" def _make_batch_manager(self): """Create a TraceBatchManager with a pre-set trace_batch_id.""" with patch( "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", return_value="mock_token", ): bm = TraceBatchManager() bm.current_batch = TraceBatch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew", "crew_name": "test"}, ) bm.trace_batch_id = bm.current_batch.batch_id bm.is_current_batch_ephemeral = False # authenticated path return bm def test_401_non_ephemeral_falls_back_to_ephemeral(self): """A 401 on the non-ephemeral endpoint should retry as ephemeral.""" bm = self._make_batch_manager() server_id = "ephemeral-fallback-id" auth_rejected = MagicMock(status_code=401, text="Bad credentials") ephemeral_success = MagicMock( status_code=201, json=MagicMock(return_value={"ephemeral_trace_id": server_id}), ) with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_trace_batch", return_value=auth_rejected, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", return_value=ephemeral_success, ) as mock_ephemeral, patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), ): bm._initialize_backend_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, use_ephemeral=False, ) assert bm.trace_batch_id == server_id assert bm.is_current_batch_ephemeral is True mock_ephemeral.assert_called_once() def test_403_non_ephemeral_falls_back_to_ephemeral(self): """A 403 on the non-ephemeral endpoint should also fall back.""" bm = self._make_batch_manager() server_id = "ephemeral-fallback-403" forbidden = MagicMock(status_code=403, text="Forbidden") ephemeral_success = MagicMock( status_code=201, json=MagicMock(return_value={"ephemeral_trace_id": server_id}), ) with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_trace_batch", return_value=forbidden, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", return_value=ephemeral_success, ), patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), ): bm._initialize_backend_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, use_ephemeral=False, ) assert bm.trace_batch_id == server_id assert bm.is_current_batch_ephemeral is True def test_401_on_ephemeral_does_not_recurse(self): """A 401 on the ephemeral endpoint should NOT try to fall back again.""" bm = self._make_batch_manager() bm.is_current_batch_ephemeral = True auth_rejected = MagicMock(status_code=401, text="Bad credentials") with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", return_value=auth_rejected, ) as mock_ephemeral, patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), ): bm._initialize_backend_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, use_ephemeral=True, ) assert bm.trace_batch_id is None # Called only once — no recursive fallback mock_ephemeral.assert_called() def test_401_fallback_ephemeral_also_fails(self): """If ephemeral fallback also fails, trace_batch_id is cleared.""" bm = self._make_batch_manager() auth_rejected = MagicMock(status_code=401, text="Bad credentials") ephemeral_fail = MagicMock(status_code=422, text="Validation failed") with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch.object( bm.plus_api, "initialize_trace_batch", return_value=auth_rejected, ), patch.object( bm.plus_api, "initialize_ephemeral_trace_batch", return_value=ephemeral_fail, ), patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"), ): bm._initialize_backend_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, use_ephemeral=False, ) assert bm.trace_batch_id is None class TestMarkBatchAsFailedRouting: """Tests: _mark_batch_as_failed routes to the correct endpoint.""" def _make_batch_manager(self, ephemeral: bool = False): with patch( "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", return_value="mock_token", ): bm = TraceBatchManager() bm.is_current_batch_ephemeral = ephemeral return bm def test_routes_to_ephemeral_endpoint_when_ephemeral(self): """Ephemeral batches must use mark_ephemeral_trace_batch_as_failed.""" bm = self._make_batch_manager(ephemeral=True) with patch.object( bm.plus_api, "mark_ephemeral_trace_batch_as_failed" ) as mock_ephemeral, patch.object( bm.plus_api, "mark_trace_batch_as_failed" ) as mock_non_ephemeral: bm._mark_batch_as_failed("batch-123", "some error") mock_ephemeral.assert_called_once_with("batch-123", "some error") mock_non_ephemeral.assert_not_called() def test_routes_to_non_ephemeral_endpoint_when_not_ephemeral(self): """Non-ephemeral batches must use mark_trace_batch_as_failed.""" bm = self._make_batch_manager(ephemeral=False) with patch.object( bm.plus_api, "mark_ephemeral_trace_batch_as_failed" ) as mock_ephemeral, patch.object( bm.plus_api, "mark_trace_batch_as_failed" ) as mock_non_ephemeral: bm._mark_batch_as_failed("batch-456", "another error") mock_non_ephemeral.assert_called_once_with("batch-456", "another error") mock_ephemeral.assert_not_called() class TestBackendInitializedGatedOnSuccess: """Tests: backend_initialized reflects actual init success on non-first-time path.""" def test_backend_initialized_true_on_success(self): """backend_initialized is True when _initialize_backend_batch succeeds.""" with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch( "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces", return_value=False, ), patch( "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", return_value="mock_token", ), ): bm = TraceBatchManager() mock_response = MagicMock( status_code=201, json=MagicMock(return_value={"trace_id": "server-id"}), ) with patch.object( bm.plus_api, "initialize_trace_batch", return_value=mock_response ): bm.initialize_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, ) assert bm.backend_initialized is True assert bm.trace_batch_id == "server-id" def test_backend_initialized_false_on_failure(self): """backend_initialized is False when _initialize_backend_batch fails.""" with ( patch( "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True, ), patch( "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces", return_value=False, ), patch( "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token", return_value="mock_token", ), ): bm = TraceBatchManager() with patch.object( bm.plus_api, "initialize_trace_batch", return_value=None ): bm.initialize_batch( user_context={"privacy_level": "standard"}, execution_metadata={"execution_type": "crew"}, ) assert bm.backend_initialized is False assert bm.trace_batch_id is None