mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-15 15:32:40 +00:00
* perf: reduce framework overhead for NVIDIA benchmarks - Lazy initialize event bus thread pool and event loop on first emit() instead of at import time (~200ms savings) - Skip trace listener registration (50+ handlers) when tracing disabled - Skip trace prompt in non-interactive contexts (isatty check) to avoid 20s timeout in CI/Docker/API servers - Skip flush() when no events were emitted (avoids 30s timeout waste) - Add _has_pending_events flag to track if any events were emitted - Add _executor_initialized flag for lazy init double-checked locking All existing behavior preserved when tracing IS enabled. No public APIs changed - only conditional guards added. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: address PR review comments — tracing override, executor init order, stdin guard, unused import Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * style: fix ruff formatting in trace_listener.py and utils.py --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: Iris Clawd <iris@crewai.com> Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
1643 lines
62 KiB
Python
1643 lines
62 KiB
Python
import os
|
|
from unittest.mock import MagicMock, Mock, patch
|
|
|
|
import pytest
|
|
from crewai import Agent, Crew, Task
|
|
from crewai.events.listeners.tracing.first_time_trace_handler import (
|
|
FirstTimeTraceHandler,
|
|
)
|
|
from crewai.events.listeners.tracing.trace_batch_manager import (
|
|
TraceBatch,
|
|
TraceBatchManager,
|
|
)
|
|
from crewai.events.listeners.tracing.trace_listener import (
|
|
TraceCollectionListener,
|
|
)
|
|
from crewai.events.listeners.tracing.types import TraceEvent
|
|
from crewai.flow.flow import Flow, start
|
|
from tests.utils import wait_for_event_handlers
|
|
|
|
|
|
class TestTraceListenerSetup:
|
|
"""Test TraceListener is properly setup and collecting events"""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def mock_user_data_file_io(self):
|
|
"""Mock user data file I/O to prevent file system pollution between tests"""
|
|
with patch(
|
|
"crewai.events.listeners.tracing.utils._load_user_data",
|
|
return_value={},
|
|
):
|
|
yield
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def mock_auth_token(self):
|
|
"""Mock authentication token for all tests in this class"""
|
|
# Need to patch all the places where get_auth_token is imported/used
|
|
with (
|
|
patch(
|
|
"crewai.cli.authentication.token.get_auth_token",
|
|
return_value="mock_token_12345",
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_listener.get_auth_token",
|
|
return_value="mock_token_12345",
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
|
return_value="mock_token_12345",
|
|
),
|
|
):
|
|
yield
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_tracing_singletons(self):
|
|
"""Reset tracing singleton instances between tests"""
|
|
from crewai.events.event_bus import crewai_event_bus
|
|
from crewai.events.event_listener import EventListener
|
|
from crewai.events.listeners.tracing.utils import _tracing_enabled
|
|
|
|
# Reset the tracing enabled contextvar
|
|
try:
|
|
_tracing_enabled.set(None)
|
|
except (LookupError, AttributeError):
|
|
pass
|
|
|
|
# Clear event bus handlers BEFORE creating any new singletons
|
|
with crewai_event_bus._rwlock.w_locked():
|
|
crewai_event_bus._sync_handlers = {}
|
|
crewai_event_bus._async_handlers = {}
|
|
crewai_event_bus._handler_dependencies = {}
|
|
crewai_event_bus._execution_plan_cache = {}
|
|
|
|
# Reset TraceCollectionListener singleton - must reset instance attributes too
|
|
if TraceCollectionListener._instance is not None:
|
|
# Reset instance attributes that shadow class attributes (only if they exist as instance attrs)
|
|
instance_dict = TraceCollectionListener._instance.__dict__
|
|
if "_initialized" in instance_dict:
|
|
del TraceCollectionListener._instance._initialized
|
|
if "_listeners_setup" in instance_dict:
|
|
del TraceCollectionListener._instance._listeners_setup
|
|
|
|
# Reset class attributes
|
|
TraceCollectionListener._instance = None
|
|
TraceCollectionListener._initialized = False
|
|
TraceCollectionListener._listeners_setup = False
|
|
|
|
# Reset EventListener singleton
|
|
if hasattr(EventListener, "_instance"):
|
|
EventListener._instance = None
|
|
|
|
yield
|
|
|
|
# Clean up after test
|
|
with crewai_event_bus._rwlock.w_locked():
|
|
crewai_event_bus._sync_handlers = {}
|
|
crewai_event_bus._async_handlers = {}
|
|
crewai_event_bus._handler_dependencies = {}
|
|
crewai_event_bus._execution_plan_cache = {}
|
|
|
|
# Reset TraceCollectionListener singleton - must reset instance attributes too
|
|
if TraceCollectionListener._instance is not None:
|
|
# Reset instance attributes that shadow class attributes (only if they exist as instance attrs)
|
|
instance_dict = TraceCollectionListener._instance.__dict__
|
|
if "_initialized" in instance_dict:
|
|
del TraceCollectionListener._instance._initialized
|
|
if "_listeners_setup" in instance_dict:
|
|
del TraceCollectionListener._instance._listeners_setup
|
|
|
|
# Reset class attributes
|
|
TraceCollectionListener._instance = None
|
|
TraceCollectionListener._initialized = False
|
|
TraceCollectionListener._listeners_setup = False
|
|
|
|
if hasattr(EventListener, "_instance"):
|
|
EventListener._instance = None
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def mock_plus_api_calls(self):
|
|
"""Mock all PlusAPI HTTP calls to avoid network requests"""
|
|
with (
|
|
patch("requests.post") as mock_post,
|
|
patch("requests.get") as mock_get,
|
|
patch("requests.put") as mock_put,
|
|
patch("requests.delete") as mock_delete,
|
|
patch.object(TraceBatchManager, "_cleanup_batch_data", return_value=True),
|
|
):
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"id": "mock_trace_batch_id",
|
|
"status": "success",
|
|
"message": "Batch created successfully",
|
|
}
|
|
mock_response.raise_for_status.return_value = None
|
|
|
|
mock_post.return_value = mock_response
|
|
mock_get.return_value = mock_response
|
|
mock_put.return_value = mock_response
|
|
mock_delete.return_value = mock_response
|
|
|
|
mock_mark_failed = MagicMock()
|
|
mock_mark_failed.return_value = mock_response
|
|
|
|
yield {
|
|
"post": mock_post,
|
|
"get": mock_get,
|
|
"put": mock_put,
|
|
"delete": mock_delete,
|
|
"mark_trace_batch_as_failed": mock_mark_failed,
|
|
}
|
|
|
|
@pytest.mark.vcr()
|
|
def test_trace_listener_collects_crew_events(self):
|
|
"""Test that trace listener properly collects events from crew execution"""
|
|
|
|
with patch.dict(
|
|
os.environ,
|
|
{
|
|
"CREWAI_TRACING_ENABLED": "true",
|
|
"CREWAI_DISABLE_TELEMETRY": "false",
|
|
"CREWAI_DISABLE_TRACKING": "false",
|
|
"OTEL_SDK_DISABLED": "false",
|
|
},
|
|
):
|
|
agent = Agent(
|
|
role="Test Agent",
|
|
goal="Test goal",
|
|
backstory="Test backstory",
|
|
llm="gpt-4o-mini",
|
|
)
|
|
task = Task(
|
|
description="Say hello to the world",
|
|
expected_output="hello world",
|
|
agent=agent,
|
|
)
|
|
crew = Crew(agents=[agent], tasks=[task], verbose=True)
|
|
|
|
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
|
|
trace_listener = TraceCollectionListener()
|
|
|
|
crew.kickoff()
|
|
|
|
initialized = trace_listener.batch_manager.wait_for_batch_initialization(timeout=5.0)
|
|
|
|
assert initialized, "Batch should have been initialized"
|
|
assert trace_listener.batch_manager.is_batch_initialized()
|
|
assert trace_listener.batch_manager.current_batch is not None
|
|
|
|
@pytest.mark.vcr()
|
|
def test_batch_manager_finalizes_batch_clears_buffer(self):
|
|
"""Test that batch manager properly finalizes batch and clears buffer"""
|
|
|
|
with patch.dict(
|
|
os.environ,
|
|
{
|
|
"CREWAI_TRACING_ENABLED": "true",
|
|
"CREWAI_DISABLE_TELEMETRY": "false",
|
|
"CREWAI_DISABLE_TRACKING": "false",
|
|
"OTEL_SDK_DISABLED": "false",
|
|
},
|
|
):
|
|
agent = Agent(
|
|
role="Test Agent",
|
|
goal="Test goal",
|
|
backstory="Test backstory",
|
|
llm="gpt-4o-mini",
|
|
)
|
|
|
|
task = Task(
|
|
description="Say hello to the world",
|
|
expected_output="hello world",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task], verbose=True)
|
|
|
|
from crewai.events.event_bus import crewai_event_bus
|
|
|
|
trace_listener = None
|
|
with crewai_event_bus._rwlock.r_locked():
|
|
for handler_set in crewai_event_bus._sync_handlers.values():
|
|
for handler in handler_set:
|
|
if hasattr(handler, "__self__") and isinstance(
|
|
handler.__self__, TraceCollectionListener
|
|
):
|
|
trace_listener = handler.__self__
|
|
break
|
|
if trace_listener:
|
|
break
|
|
if not trace_listener:
|
|
for handler_set in crewai_event_bus._async_handlers.values():
|
|
for handler in handler_set:
|
|
if hasattr(handler, "__self__") and isinstance(
|
|
handler.__self__, TraceCollectionListener
|
|
):
|
|
trace_listener = handler.__self__
|
|
break
|
|
if trace_listener:
|
|
break
|
|
|
|
if not trace_listener:
|
|
pytest.skip(
|
|
"No trace listener found - tracing may not be properly enabled"
|
|
)
|
|
|
|
with patch.object(
|
|
trace_listener.batch_manager,
|
|
"finalize_batch",
|
|
wraps=trace_listener.batch_manager.finalize_batch,
|
|
) as finalize_mock:
|
|
crew.kickoff()
|
|
|
|
assert finalize_mock.call_count >= 1
|
|
|
|
@pytest.mark.vcr()
|
|
def test_events_collection_batch_manager(self, mock_plus_api_calls):
|
|
"""Test that trace listener properly collects events from crew execution"""
|
|
|
|
with patch.dict(
|
|
os.environ,
|
|
{
|
|
"CREWAI_TRACING_ENABLED": "true",
|
|
"CREWAI_DISABLE_TELEMETRY": "false",
|
|
"CREWAI_DISABLE_TRACKING": "false",
|
|
"OTEL_SDK_DISABLED": "false",
|
|
},
|
|
):
|
|
agent = Agent(
|
|
role="Test Agent",
|
|
goal="Test goal",
|
|
backstory="Test backstory",
|
|
llm="gpt-4o-mini",
|
|
)
|
|
task = Task(
|
|
description="Say hello to the world",
|
|
expected_output="hello world",
|
|
agent=agent,
|
|
)
|
|
crew = Crew(agents=[agent], tasks=[task], verbose=True)
|
|
|
|
from crewai.events.event_bus import crewai_event_bus
|
|
|
|
# Create and setup trace listener explicitly
|
|
trace_listener = TraceCollectionListener()
|
|
trace_listener.setup_listeners(crewai_event_bus)
|
|
|
|
with patch.object(
|
|
trace_listener.batch_manager,
|
|
"add_event",
|
|
wraps=trace_listener.batch_manager.add_event,
|
|
) as add_event_mock:
|
|
crew.kickoff()
|
|
wait_for_event_handlers()
|
|
|
|
assert add_event_mock.call_count >= 2
|
|
|
|
completion_events = [
|
|
call.args[0]
|
|
for call in add_event_mock.call_args_list
|
|
if call.args[0].type == "crew_kickoff_completed"
|
|
]
|
|
assert len(completion_events) >= 1
|
|
|
|
# Verify the first completion event has proper structure
|
|
completion_event = completion_events[0]
|
|
assert "crew_name" in completion_event.event_data
|
|
assert completion_event.event_data["crew_name"] == "crew"
|
|
|
|
# Verify all events have proper structure
|
|
for call in add_event_mock.call_args_list:
|
|
event = call.args[0]
|
|
assert isinstance(event, TraceEvent)
|
|
assert hasattr(event, "event_data")
|
|
assert hasattr(event, "type")
|
|
|
|
@pytest.mark.vcr()
|
|
def test_trace_listener_disabled_when_env_false(self):
|
|
"""Test that trace listener doesn't make HTTP calls when tracing is disabled"""
|
|
|
|
with patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "false"}):
|
|
agent = Agent(
|
|
role="Test Agent",
|
|
goal="Test goal",
|
|
backstory="Test backstory",
|
|
llm="gpt-4o-mini",
|
|
)
|
|
task = Task(
|
|
description="Say hello to the world",
|
|
expected_output="hello world",
|
|
agent=agent,
|
|
)
|
|
|
|
crew = Crew(agents=[agent], tasks=[task], verbose=True)
|
|
result = crew.kickoff()
|
|
assert result is not None
|
|
|
|
from crewai.events.event_bus import crewai_event_bus
|
|
|
|
trace_handlers = []
|
|
with crewai_event_bus._rwlock.r_locked():
|
|
for handlers in crewai_event_bus._sync_handlers.values():
|
|
for handler in handlers:
|
|
if hasattr(handler, "__self__") and isinstance(
|
|
handler.__self__, TraceCollectionListener
|
|
):
|
|
trace_handlers.append(handler)
|
|
for handlers in crewai_event_bus._async_handlers.values():
|
|
for handler in handlers:
|
|
if hasattr(handler, "__self__") and isinstance(
|
|
handler.__self__, TraceCollectionListener
|
|
):
|
|
trace_handlers.append(handler)
|
|
|
|
assert len(trace_handlers) == 0, (
|
|
f"Found {len(trace_handlers)} TraceCollectionListener handlers when tracing should be disabled"
|
|
)
|
|
|
|
def test_trace_listener_setup_correctly_for_crew(self):
|
|
"""Test that trace listener is set up correctly when enabled"""
|
|
|
|
with patch.dict(
|
|
os.environ,
|
|
{
|
|
"CREWAI_TRACING_ENABLED": "true",
|
|
"CREWAI_DISABLE_TELEMETRY": "false",
|
|
"CREWAI_DISABLE_TRACKING": "false",
|
|
"OTEL_SDK_DISABLED": "false",
|
|
},
|
|
):
|
|
agent = Agent(
|
|
role="Test Agent",
|
|
goal="Test goal",
|
|
backstory="Test backstory",
|
|
llm="gpt-4o-mini",
|
|
)
|
|
task = Task(
|
|
description="Say hello to the world",
|
|
expected_output="hello world",
|
|
agent=agent,
|
|
)
|
|
with patch.object(
|
|
TraceCollectionListener, "setup_listeners"
|
|
) as mock_listener_setup:
|
|
Crew(agents=[agent], tasks=[task], verbose=True)
|
|
assert mock_listener_setup.call_count >= 1
|
|
|
|
@pytest.mark.vcr()
|
|
def test_trace_listener_setup_correctly_for_flow(self):
|
|
"""Test that trace listener is set up correctly when enabled"""
|
|
|
|
with patch.dict(
|
|
os.environ,
|
|
{
|
|
"CREWAI_TRACING_ENABLED": "true",
|
|
"CREWAI_DISABLE_TELEMETRY": "false",
|
|
"CREWAI_DISABLE_TRACKING": "false",
|
|
"OTEL_SDK_DISABLED": "false",
|
|
},
|
|
):
|
|
class FlowExample(Flow):
|
|
@start()
|
|
def start(self):
|
|
pass
|
|
|
|
with patch.object(
|
|
TraceCollectionListener, "setup_listeners"
|
|
) as mock_listener_setup:
|
|
FlowExample()
|
|
assert mock_listener_setup.call_count >= 1
|
|
|
|
@pytest.mark.vcr()
|
|
def test_trace_listener_ephemeral_batch(self):
|
|
"""Test that trace listener properly handles ephemeral batches"""
|
|
with (
|
|
patch.dict(
|
|
os.environ,
|
|
{
|
|
"CREWAI_TRACING_ENABLED": "true",
|
|
"CREWAI_DISABLE_TELEMETRY": "false",
|
|
"CREWAI_DISABLE_TRACKING": "false",
|
|
"OTEL_SDK_DISABLED": "false",
|
|
},
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_listener.TraceCollectionListener._check_authenticated",
|
|
return_value=False,
|
|
),
|
|
):
|
|
agent = Agent(
|
|
role="Test Agent",
|
|
goal="Test goal",
|
|
backstory="Test backstory",
|
|
llm="gpt-4o-mini",
|
|
)
|
|
task = Task(
|
|
description="Say hello to the world",
|
|
expected_output="hello world",
|
|
agent=agent,
|
|
)
|
|
crew = Crew(agents=[agent], tasks=[task], tracing=True)
|
|
|
|
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
|
|
trace_listener = TraceCollectionListener()
|
|
|
|
crew.kickoff()
|
|
|
|
initialized = trace_listener.batch_manager.wait_for_batch_initialization(timeout=5.0)
|
|
assert initialized, (
|
|
"Batch should have been initialized for unauthenticated user"
|
|
)
|
|
|
|
wait_for_event_handlers()
|
|
|
|
@pytest.mark.vcr()
|
|
def test_trace_listener_with_authenticated_user(self):
|
|
"""Test that trace listener properly handles authenticated batches"""
|
|
with patch.dict(
|
|
os.environ,
|
|
{
|
|
"CREWAI_TRACING_ENABLED": "true",
|
|
"CREWAI_DISABLE_TELEMETRY": "false",
|
|
"CREWAI_DISABLE_TRACKING": "false",
|
|
"OTEL_SDK_DISABLED": "false",
|
|
},
|
|
):
|
|
agent = Agent(
|
|
role="Test Agent",
|
|
goal="Test goal",
|
|
backstory="Test backstory",
|
|
llm="gpt-4o-mini",
|
|
)
|
|
task = Task(
|
|
description="Say hello to the world",
|
|
expected_output="hello world",
|
|
agent=agent,
|
|
)
|
|
|
|
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
|
|
trace_listener = TraceCollectionListener()
|
|
|
|
crew = Crew(agents=[agent], tasks=[task], tracing=True)
|
|
crew.kickoff()
|
|
|
|
initialized = trace_listener.batch_manager.wait_for_batch_initialization(timeout=5.0)
|
|
assert initialized, (
|
|
"Batch should have been initialized for authenticated user"
|
|
)
|
|
|
|
wait_for_event_handlers()
|
|
|
|
# Helper method to ensure cleanup
|
|
def teardown_method(self):
|
|
"""Cleanup after each test method"""
|
|
from crewai.events.event_bus import crewai_event_bus
|
|
from crewai.events.event_listener import EventListener
|
|
|
|
with crewai_event_bus._rwlock.w_locked():
|
|
crewai_event_bus._sync_handlers = {}
|
|
crewai_event_bus._async_handlers = {}
|
|
crewai_event_bus._handler_dependencies = {}
|
|
crewai_event_bus._execution_plan_cache = {}
|
|
|
|
# Reset EventListener singleton
|
|
if hasattr(EventListener, "_instance"):
|
|
EventListener._instance = None
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
"""Final cleanup after all tests in this class"""
|
|
from crewai.events.event_bus import crewai_event_bus
|
|
from crewai.events.event_listener import EventListener
|
|
|
|
with crewai_event_bus._rwlock.w_locked():
|
|
crewai_event_bus._sync_handlers = {}
|
|
crewai_event_bus._async_handlers = {}
|
|
crewai_event_bus._handler_dependencies = {}
|
|
crewai_event_bus._execution_plan_cache = {}
|
|
|
|
# Reset EventListener singleton
|
|
if hasattr(EventListener, "_instance"):
|
|
EventListener._instance = None
|
|
|
|
@pytest.mark.vcr()
|
|
def test_first_time_user_trace_collection_with_timeout(self, mock_plus_api_calls):
|
|
"""Test first-time user trace collection logic with timeout behavior"""
|
|
|
|
with (
|
|
patch.dict(
|
|
os.environ,
|
|
{
|
|
"CREWAI_TRACING_ENABLED": "false",
|
|
"CREWAI_DISABLE_TELEMETRY": "false",
|
|
"CREWAI_DISABLE_TRACKING": "false",
|
|
"OTEL_SDK_DISABLED": "false",
|
|
},
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils._is_test_environment",
|
|
return_value=False,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils.is_first_execution",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing",
|
|
return_value=False,
|
|
) as mock_prompt,
|
|
patch(
|
|
"crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed"
|
|
) as mock_mark_completed,
|
|
):
|
|
agent = Agent(
|
|
role="Test Agent",
|
|
goal="Test goal",
|
|
backstory="Test backstory",
|
|
llm="gpt-4o-mini",
|
|
)
|
|
task = Task(
|
|
description="Say hello to the world",
|
|
expected_output="hello world",
|
|
agent=agent,
|
|
)
|
|
crew = Crew(agents=[agent], tasks=[task], verbose=True)
|
|
|
|
from crewai.events.event_bus import crewai_event_bus
|
|
|
|
trace_listener = TraceCollectionListener()
|
|
trace_listener.setup_listeners(crewai_event_bus)
|
|
|
|
trace_listener.first_time_handler = FirstTimeTraceHandler()
|
|
if trace_listener.first_time_handler.initialize_for_first_time_user():
|
|
trace_listener.first_time_handler.set_batch_manager(trace_listener.batch_manager)
|
|
|
|
assert trace_listener.first_time_handler.is_first_time is True
|
|
assert trace_listener.first_time_handler.collected_events is False
|
|
|
|
trace_listener.batch_manager.batch_owner_type = "crew"
|
|
|
|
result = crew.kickoff()
|
|
wait_for_event_handlers()
|
|
assert result is not None
|
|
|
|
assert trace_listener.first_time_handler.collected_events is True, (
|
|
"Events should have been collected"
|
|
)
|
|
|
|
mock_prompt.assert_called_once()
|
|
|
|
mock_mark_completed.assert_called_once()
|
|
|
|
@pytest.mark.vcr()
|
|
def test_first_time_user_trace_collection_user_accepts(self, mock_plus_api_calls):
|
|
"""Test first-time user trace collection when user accepts viewing traces"""
|
|
|
|
with (
|
|
patch.dict(
|
|
os.environ,
|
|
{
|
|
"CREWAI_TRACING_ENABLED": "false",
|
|
"CREWAI_DISABLE_TELEMETRY": "false",
|
|
"CREWAI_DISABLE_TRACKING": "false",
|
|
"OTEL_SDK_DISABLED": "false",
|
|
},
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils._is_test_environment",
|
|
return_value=False,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils.is_first_execution",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed"
|
|
) as mock_mark_completed,
|
|
):
|
|
agent = Agent(
|
|
role="Test Agent",
|
|
goal="Test goal",
|
|
backstory="Test backstory",
|
|
llm="gpt-4o-mini",
|
|
)
|
|
task = Task(
|
|
description="Say hello to the world",
|
|
expected_output="hello world",
|
|
agent=agent,
|
|
)
|
|
crew = Crew(agents=[agent], tasks=[task], verbose=True)
|
|
|
|
from crewai.events.event_bus import crewai_event_bus
|
|
|
|
trace_listener = TraceCollectionListener()
|
|
trace_listener.setup_listeners(crewai_event_bus)
|
|
|
|
# Re-initialize first-time handler after patches are applied to ensure clean state
|
|
trace_listener.first_time_handler = FirstTimeTraceHandler()
|
|
if trace_listener.first_time_handler.initialize_for_first_time_user():
|
|
trace_listener.first_time_handler.set_batch_manager(trace_listener.batch_manager)
|
|
|
|
trace_listener.batch_manager.ephemeral_trace_url = (
|
|
"https://crewai.com/trace/mock-id"
|
|
)
|
|
|
|
assert trace_listener.first_time_handler.is_first_time is True
|
|
|
|
trace_listener.first_time_handler.collected_events = True
|
|
|
|
mock_batch_response = MagicMock()
|
|
mock_batch_response.status_code = 201
|
|
mock_batch_response.json.return_value = {
|
|
"trace_id": "mock-trace-id",
|
|
"ephemeral_trace_id": "mock-ephemeral-trace-id",
|
|
"access_code": "TRACE-mock",
|
|
}
|
|
mock_events_response = MagicMock()
|
|
mock_events_response.status_code = 200
|
|
|
|
with (
|
|
patch.object(
|
|
trace_listener.first_time_handler,
|
|
"_initialize_backend_and_send_events",
|
|
wraps=trace_listener.first_time_handler._initialize_backend_and_send_events,
|
|
) as mock_init_backend,
|
|
patch.object(
|
|
trace_listener.first_time_handler, "_display_ephemeral_trace_link"
|
|
) as mock_display_link,
|
|
patch.object(
|
|
trace_listener.batch_manager.plus_api,
|
|
"initialize_trace_batch",
|
|
return_value=mock_batch_response,
|
|
),
|
|
patch.object(
|
|
trace_listener.batch_manager.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=mock_batch_response,
|
|
),
|
|
patch.object(
|
|
trace_listener.batch_manager.plus_api,
|
|
"send_trace_events",
|
|
return_value=mock_events_response,
|
|
),
|
|
patch.object(
|
|
trace_listener.batch_manager.plus_api,
|
|
"send_ephemeral_trace_events",
|
|
return_value=mock_events_response,
|
|
),
|
|
patch.object(
|
|
trace_listener.batch_manager.plus_api,
|
|
"finalize_trace_batch",
|
|
return_value=mock_events_response,
|
|
),
|
|
patch.object(
|
|
trace_listener.batch_manager.plus_api,
|
|
"finalize_ephemeral_trace_batch",
|
|
return_value=mock_events_response,
|
|
),
|
|
patch.object(
|
|
trace_listener.batch_manager,
|
|
"_cleanup_batch_data",
|
|
),
|
|
):
|
|
crew.kickoff()
|
|
wait_for_event_handlers()
|
|
|
|
mock_init_backend.assert_called_once()
|
|
|
|
mock_display_link.assert_called_once()
|
|
|
|
mock_mark_completed.assert_called_once()
|
|
|
|
@pytest.mark.vcr()
|
|
def test_first_time_user_trace_consolidation_logic(self, mock_plus_api_calls):
|
|
"""Test the consolidation logic for first-time users vs regular tracing"""
|
|
with (
|
|
patch.dict(
|
|
os.environ,
|
|
{
|
|
"CREWAI_TRACING_ENABLED": "",
|
|
"CREWAI_DISABLE_TELEMETRY": "false",
|
|
"CREWAI_DISABLE_TRACKING": "false",
|
|
"OTEL_SDK_DISABLED": "false",
|
|
},
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils._is_test_environment",
|
|
return_value=False,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils.is_first_execution",
|
|
return_value=True,
|
|
),
|
|
):
|
|
from crewai.events.event_bus import crewai_event_bus
|
|
|
|
with crewai_event_bus._rwlock.w_locked():
|
|
crewai_event_bus._sync_handlers = {}
|
|
crewai_event_bus._async_handlers = {}
|
|
|
|
trace_listener = TraceCollectionListener()
|
|
|
|
# Re-initialize first-time handler after patches are applied to ensure clean state
|
|
# This is necessary because the singleton may have been created before patches were active
|
|
trace_listener.first_time_handler = FirstTimeTraceHandler()
|
|
if trace_listener.first_time_handler.initialize_for_first_time_user():
|
|
trace_listener.first_time_handler.set_batch_manager(trace_listener.batch_manager)
|
|
|
|
trace_listener.setup_listeners(crewai_event_bus)
|
|
|
|
assert trace_listener.first_time_handler.is_first_time is True
|
|
|
|
agent = Agent(
|
|
role="Test Agent",
|
|
goal="Test goal",
|
|
backstory="Test backstory",
|
|
llm="gpt-4o-mini",
|
|
)
|
|
task = Task(
|
|
description="Test task", expected_output="test output", agent=agent
|
|
)
|
|
crew = Crew(agents=[agent], tasks=[task])
|
|
|
|
result = crew.kickoff()
|
|
|
|
wait_for_event_handlers()
|
|
|
|
assert trace_listener.batch_manager.is_batch_initialized(), (
|
|
"Batch should have been initialized for first-time user"
|
|
)
|
|
assert result is not None
|
|
|
|
def test_first_time_handler_timeout_behavior(self):
|
|
"""Test the timeout behavior of the first-time trace prompt"""
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils._is_test_environment",
|
|
return_value=False,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils._is_interactive_terminal",
|
|
return_value=True,
|
|
),
|
|
patch("threading.Thread") as mock_thread,
|
|
):
|
|
from crewai.events.listeners.tracing.utils import (
|
|
prompt_user_for_trace_viewing,
|
|
)
|
|
|
|
mock_thread_instance = Mock()
|
|
mock_thread_instance.is_alive.return_value = True
|
|
mock_thread.return_value = mock_thread_instance
|
|
|
|
result = prompt_user_for_trace_viewing(timeout_seconds=5)
|
|
|
|
assert result is False
|
|
mock_thread.assert_called_once()
|
|
call_args = mock_thread.call_args
|
|
assert call_args[1]["daemon"] is True
|
|
|
|
mock_thread_instance.start.assert_called_once()
|
|
mock_thread_instance.join.assert_called_once_with(timeout=5)
|
|
mock_thread_instance.is_alive.assert_called_once()
|
|
|
|
def test_first_time_handler_graceful_error_handling(self):
|
|
"""Test graceful error handling in first-time trace logic"""
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing",
|
|
side_effect=Exception("Prompt failed"),
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed"
|
|
) as mock_mark_completed,
|
|
):
|
|
handler = FirstTimeTraceHandler()
|
|
handler.is_first_time = True
|
|
handler.collected_events = True
|
|
|
|
handler.handle_execution_completion()
|
|
|
|
mock_mark_completed.assert_called_once()
|
|
|
|
def test_trace_batch_marked_as_failed_on_finalize_error(self):
|
|
"""Test that trace batch is marked as failed when finalization returns non-200 status"""
|
|
# Test the error handling logic directly in TraceBatchManager
|
|
with patch("crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True):
|
|
batch_manager = TraceBatchManager()
|
|
|
|
# Initialize a batch
|
|
batch_manager.current_batch = batch_manager.initialize_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={
|
|
"execution_type": "crew",
|
|
"crew_name": "test_crew",
|
|
},
|
|
)
|
|
batch_manager.trace_batch_id = "test_batch_id_12345"
|
|
batch_manager.backend_initialized = True
|
|
|
|
# Mock the API responses
|
|
with (
|
|
patch.object(
|
|
batch_manager.plus_api,
|
|
"send_trace_events",
|
|
return_value=MagicMock(status_code=200),
|
|
),
|
|
patch.object(
|
|
batch_manager.plus_api,
|
|
"finalize_trace_batch",
|
|
return_value=MagicMock(status_code=500, text="Internal Server Error"),
|
|
),
|
|
patch.object(
|
|
batch_manager.plus_api,
|
|
"mark_trace_batch_as_failed",
|
|
) as mock_mark_failed,
|
|
):
|
|
# Call finalize_batch directly
|
|
batch_manager.finalize_batch()
|
|
|
|
# Verify that mark_trace_batch_as_failed was called with the error message
|
|
mock_mark_failed.assert_called_once_with(
|
|
"test_batch_id_12345", "Internal Server Error"
|
|
)
|
|
|
|
def test_ephemeral_batch_includes_anon_id(self):
|
|
"""Test that ephemeral batch initialization sends anon_id from get_user_id()"""
|
|
fake_user_id = "abc123def456"
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.get_user_id",
|
|
return_value=fake_user_id,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
|
|
return_value=False,
|
|
),
|
|
):
|
|
batch_manager = TraceBatchManager()
|
|
|
|
mock_response = MagicMock(
|
|
status_code=201,
|
|
json=MagicMock(return_value={
|
|
"ephemeral_trace_id": "test-trace-id",
|
|
"access_code": "TRACE-abc123",
|
|
}),
|
|
)
|
|
|
|
with patch.object(
|
|
batch_manager.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=mock_response,
|
|
) as mock_init:
|
|
batch_manager.initialize_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={
|
|
"execution_type": "crew",
|
|
"crew_name": "test_crew",
|
|
},
|
|
use_ephemeral=True,
|
|
)
|
|
|
|
mock_init.assert_called_once()
|
|
payload = mock_init.call_args[0][0]
|
|
assert payload["user_identifier"] == fake_user_id
|
|
assert "ephemeral_trace_id" in payload
|
|
|
|
def test_non_ephemeral_batch_does_not_include_anon_id(self):
|
|
"""Test that non-ephemeral batch initialization does not send anon_id"""
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
|
|
return_value=False,
|
|
),
|
|
):
|
|
batch_manager = TraceBatchManager()
|
|
|
|
mock_response = MagicMock(
|
|
status_code=201,
|
|
json=MagicMock(return_value={"trace_id": "test-trace-id"}),
|
|
)
|
|
|
|
with patch.object(
|
|
batch_manager.plus_api,
|
|
"initialize_trace_batch",
|
|
return_value=mock_response,
|
|
) as mock_init:
|
|
batch_manager.initialize_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={
|
|
"execution_type": "crew",
|
|
"crew_name": "test_crew",
|
|
},
|
|
use_ephemeral=False,
|
|
)
|
|
|
|
mock_init.assert_called_once()
|
|
payload = mock_init.call_args[0][0]
|
|
assert "user_identifier" not in payload
|
|
|
|
|
|
class TestTraceBatchIdClearedOnFailure:
|
|
"""Tests: trace_batch_id is cleared when _initialize_backend_batch fails."""
|
|
|
|
def _make_batch_manager(self):
|
|
"""Create a TraceBatchManager with a pre-set trace_batch_id (simulating first-time user)."""
|
|
with patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
|
return_value="mock_token",
|
|
):
|
|
bm = TraceBatchManager()
|
|
bm.current_batch = TraceBatch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
|
)
|
|
bm.trace_batch_id = bm.current_batch.batch_id # simulate line 96
|
|
bm.is_current_batch_ephemeral = True
|
|
return bm
|
|
|
|
def test_trace_batch_id_cleared_on_exception(self):
|
|
"""trace_batch_id must be None when the API call raises an exception."""
|
|
bm = self._make_batch_manager()
|
|
assert bm.trace_batch_id is not None
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
side_effect=ConnectionError("network down"),
|
|
),
|
|
):
|
|
bm._initialize_backend_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
use_ephemeral=True,
|
|
)
|
|
|
|
assert bm.trace_batch_id is None
|
|
|
|
def test_trace_batch_id_set_on_success(self):
|
|
"""trace_batch_id must be set from the server response on success."""
|
|
bm = self._make_batch_manager()
|
|
server_id = "server-ephemeral-trace-id-999"
|
|
|
|
mock_response = MagicMock(
|
|
status_code=201,
|
|
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=mock_response,
|
|
),
|
|
):
|
|
bm._initialize_backend_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
use_ephemeral=True,
|
|
)
|
|
|
|
assert bm.trace_batch_id == server_id
|
|
|
|
def test_send_events_skipped_when_trace_batch_id_none(self):
|
|
"""_send_events_to_backend must return early when trace_batch_id is None."""
|
|
bm = self._make_batch_manager()
|
|
bm.trace_batch_id = None
|
|
bm.event_buffer = [MagicMock()] # has events
|
|
|
|
with patch.object(
|
|
bm.plus_api, "send_ephemeral_trace_events"
|
|
) as mock_send:
|
|
result = bm._send_events_to_backend()
|
|
|
|
assert result == 500
|
|
mock_send.assert_not_called()
|
|
|
|
|
|
class TestInitializeBackendBatchRetry:
|
|
"""Tests for retry logic in _initialize_backend_batch."""
|
|
|
|
def _make_batch_manager(self):
|
|
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
|
|
with patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
|
return_value="mock_token",
|
|
):
|
|
bm = TraceBatchManager()
|
|
bm.current_batch = TraceBatch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
|
)
|
|
bm.trace_batch_id = bm.current_batch.batch_id
|
|
bm.is_current_batch_ephemeral = True
|
|
return bm
|
|
|
|
def test_retries_on_none_response_then_succeeds(self):
|
|
"""Retries when API returns None, succeeds on second attempt."""
|
|
bm = self._make_batch_manager()
|
|
server_id = "server-id-after-retry"
|
|
|
|
success_response = MagicMock(
|
|
status_code=201,
|
|
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
side_effect=[None, success_response],
|
|
) as mock_init,
|
|
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
|
|
):
|
|
bm._initialize_backend_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
use_ephemeral=True,
|
|
)
|
|
|
|
assert bm.trace_batch_id == server_id
|
|
assert mock_init.call_count == 2
|
|
mock_sleep.assert_called_once_with(0.2)
|
|
|
|
def test_retries_on_5xx_then_succeeds(self):
|
|
"""Retries on 500 server error, succeeds on second attempt."""
|
|
bm = self._make_batch_manager()
|
|
server_id = "server-id-after-5xx"
|
|
|
|
error_response = MagicMock(status_code=500, text="Internal Server Error")
|
|
success_response = MagicMock(
|
|
status_code=201,
|
|
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
side_effect=[error_response, success_response],
|
|
) as mock_init,
|
|
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
|
):
|
|
bm._initialize_backend_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
use_ephemeral=True,
|
|
)
|
|
|
|
assert bm.trace_batch_id == server_id
|
|
assert mock_init.call_count == 2
|
|
|
|
def test_no_retry_on_exception(self):
|
|
"""Exceptions (e.g. timeout, connection error) abort immediately without retry."""
|
|
bm = self._make_batch_manager()
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
side_effect=ConnectionError("network down"),
|
|
) as mock_init,
|
|
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
|
|
):
|
|
bm._initialize_backend_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
use_ephemeral=True,
|
|
)
|
|
|
|
assert bm.trace_batch_id is None
|
|
assert mock_init.call_count == 1
|
|
mock_sleep.assert_not_called()
|
|
|
|
def test_no_retry_on_4xx(self):
|
|
"""Does NOT retry on 422 — client error is not transient."""
|
|
bm = self._make_batch_manager()
|
|
|
|
error_response = MagicMock(status_code=422, text="Unprocessable Entity")
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=error_response,
|
|
) as mock_init,
|
|
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
|
|
):
|
|
bm._initialize_backend_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
use_ephemeral=True,
|
|
)
|
|
|
|
assert bm.trace_batch_id is None
|
|
assert mock_init.call_count == 1
|
|
mock_sleep.assert_not_called()
|
|
|
|
def test_exhausts_retries_then_clears_batch_id(self):
|
|
"""After all retries fail, trace_batch_id is None."""
|
|
bm = self._make_batch_manager()
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=None,
|
|
) as mock_init,
|
|
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
|
):
|
|
bm._initialize_backend_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
use_ephemeral=True,
|
|
)
|
|
|
|
assert bm.trace_batch_id is None
|
|
assert mock_init.call_count == 2 # initial + 1 retry
|
|
|
|
|
|
class TestFirstTimeHandlerBackendInitGuard:
|
|
"""Tests: backend_initialized gated on actual batch creation success."""
|
|
|
|
def _make_handler_with_manager(self):
|
|
"""Create a FirstTimeTraceHandler wired to a TraceBatchManager."""
|
|
with patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
|
return_value="mock_token",
|
|
):
|
|
bm = TraceBatchManager()
|
|
bm.current_batch = TraceBatch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
|
)
|
|
bm.trace_batch_id = bm.current_batch.batch_id
|
|
bm.is_current_batch_ephemeral = True
|
|
|
|
handler = FirstTimeTraceHandler()
|
|
handler.is_first_time = True
|
|
handler.collected_events = True
|
|
handler.batch_manager = bm
|
|
return handler, bm
|
|
|
|
def test_backend_initialized_true_on_success(self):
|
|
"""Events are sent when batch creation succeeds, then state is cleaned up."""
|
|
handler, bm = self._make_handler_with_manager()
|
|
server_id = "server-id-abc"
|
|
|
|
mock_init_response = MagicMock(
|
|
status_code=201,
|
|
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
|
)
|
|
mock_send_response = MagicMock(status_code=200)
|
|
|
|
trace_batch_id_during_send = None
|
|
|
|
def capture_send(*args, **kwargs):
|
|
nonlocal trace_batch_id_during_send
|
|
trace_batch_id_during_send = bm.trace_batch_id
|
|
return mock_send_response
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=mock_init_response,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"send_ephemeral_trace_events",
|
|
side_effect=capture_send,
|
|
),
|
|
patch.object(bm, "_finalize_backend_batch"),
|
|
):
|
|
bm.event_buffer = [MagicMock(to_dict=MagicMock(return_value={}))]
|
|
handler._initialize_backend_and_send_events()
|
|
|
|
# trace_batch_id was set correctly during send
|
|
assert trace_batch_id_during_send == server_id
|
|
# State cleaned up after completion (singleton reuse)
|
|
assert bm.backend_initialized is False
|
|
assert bm.trace_batch_id is None
|
|
assert bm.current_batch is None
|
|
|
|
def test_backend_initialized_false_on_failure(self):
|
|
"""backend_initialized stays False and events are NOT sent when batch creation fails."""
|
|
handler, bm = self._make_handler_with_manager()
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=None, # server call fails
|
|
),
|
|
patch.object(bm, "_send_events_to_backend") as mock_send,
|
|
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
|
|
patch.object(handler, "_gracefully_fail") as mock_fail,
|
|
):
|
|
bm.event_buffer = [MagicMock()]
|
|
handler._initialize_backend_and_send_events()
|
|
|
|
assert bm.backend_initialized is False
|
|
assert bm.trace_batch_id is None
|
|
mock_send.assert_not_called()
|
|
mock_finalize.assert_not_called()
|
|
mock_fail.assert_called_once()
|
|
|
|
def test_backend_initialized_false_on_non_2xx(self):
|
|
"""backend_initialized stays False when server returns non-2xx."""
|
|
handler, bm = self._make_handler_with_manager()
|
|
|
|
mock_response = MagicMock(status_code=500, text="Internal Server Error")
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=mock_response,
|
|
),
|
|
patch.object(bm, "_send_events_to_backend") as mock_send,
|
|
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
|
|
patch.object(handler, "_gracefully_fail") as mock_fail,
|
|
):
|
|
bm.event_buffer = [MagicMock()]
|
|
handler._initialize_backend_and_send_events()
|
|
|
|
assert bm.backend_initialized is False
|
|
assert bm.trace_batch_id is None
|
|
mock_send.assert_not_called()
|
|
mock_finalize.assert_not_called()
|
|
mock_fail.assert_called_once()
|
|
|
|
|
|
class TestFirstTimeHandlerAlwaysEphemeral:
|
|
"""Tests that first-time handler always uses ephemeral with skip_context_check."""
|
|
|
|
def _make_handler_with_manager(self):
|
|
with patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
|
return_value="mock_token",
|
|
):
|
|
bm = TraceBatchManager()
|
|
bm.current_batch = TraceBatch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
|
)
|
|
bm.trace_batch_id = bm.current_batch.batch_id
|
|
bm.is_current_batch_ephemeral = True
|
|
|
|
handler = FirstTimeTraceHandler()
|
|
handler.is_first_time = True
|
|
handler.collected_events = True
|
|
handler.batch_manager = bm
|
|
return handler, bm
|
|
|
|
def test_deferred_init_uses_ephemeral_and_skip_context_check(self):
|
|
"""Deferred backend init always uses ephemeral=True and skip_context_check=True."""
|
|
handler, bm = self._make_handler_with_manager()
|
|
|
|
with (
|
|
patch.object(bm, "_initialize_backend_batch") as mock_init,
|
|
patch.object(bm, "_send_events_to_backend"),
|
|
patch.object(bm, "_finalize_backend_batch"),
|
|
):
|
|
mock_init.side_effect = lambda **kwargs: None
|
|
bm.event_buffer = [MagicMock()]
|
|
handler._initialize_backend_and_send_events()
|
|
|
|
mock_init.assert_called_once()
|
|
assert mock_init.call_args.kwargs["use_ephemeral"] is True
|
|
assert mock_init.call_args.kwargs["skip_context_check"] is True
|
|
|
|
|
|
class TestAuthFailbackToEphemeral:
|
|
"""Tests for ephemeral fallback when server rejects auth (401/403)."""
|
|
|
|
def _make_batch_manager(self):
|
|
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
|
|
with patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
|
return_value="mock_token",
|
|
):
|
|
bm = TraceBatchManager()
|
|
bm.current_batch = TraceBatch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
|
)
|
|
bm.trace_batch_id = bm.current_batch.batch_id
|
|
bm.is_current_batch_ephemeral = False # authenticated path
|
|
return bm
|
|
|
|
def test_401_non_ephemeral_falls_back_to_ephemeral(self):
|
|
"""A 401 on the non-ephemeral endpoint should retry as ephemeral."""
|
|
bm = self._make_batch_manager()
|
|
server_id = "ephemeral-fallback-id"
|
|
|
|
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
|
ephemeral_success = MagicMock(
|
|
status_code=201,
|
|
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_trace_batch",
|
|
return_value=auth_rejected,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=ephemeral_success,
|
|
) as mock_ephemeral,
|
|
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
|
):
|
|
bm._initialize_backend_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
use_ephemeral=False,
|
|
)
|
|
|
|
assert bm.trace_batch_id == server_id
|
|
assert bm.is_current_batch_ephemeral is True
|
|
mock_ephemeral.assert_called_once()
|
|
|
|
def test_403_non_ephemeral_falls_back_to_ephemeral(self):
|
|
"""A 403 on the non-ephemeral endpoint should also fall back."""
|
|
bm = self._make_batch_manager()
|
|
server_id = "ephemeral-fallback-403"
|
|
|
|
forbidden = MagicMock(status_code=403, text="Forbidden")
|
|
ephemeral_success = MagicMock(
|
|
status_code=201,
|
|
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_trace_batch",
|
|
return_value=forbidden,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=ephemeral_success,
|
|
),
|
|
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
|
):
|
|
bm._initialize_backend_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
use_ephemeral=False,
|
|
)
|
|
|
|
assert bm.trace_batch_id == server_id
|
|
assert bm.is_current_batch_ephemeral is True
|
|
|
|
def test_401_on_ephemeral_does_not_recurse(self):
|
|
"""A 401 on the ephemeral endpoint should NOT try to fall back again."""
|
|
bm = self._make_batch_manager()
|
|
bm.is_current_batch_ephemeral = True
|
|
|
|
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=auth_rejected,
|
|
) as mock_ephemeral,
|
|
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
|
):
|
|
bm._initialize_backend_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
use_ephemeral=True,
|
|
)
|
|
|
|
assert bm.trace_batch_id is None
|
|
# Called only once — no recursive fallback
|
|
mock_ephemeral.assert_called()
|
|
|
|
def test_401_fallback_ephemeral_also_fails(self):
|
|
"""If ephemeral fallback also fails, trace_batch_id is cleared."""
|
|
bm = self._make_batch_manager()
|
|
|
|
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
|
ephemeral_fail = MagicMock(status_code=422, text="Validation failed")
|
|
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_trace_batch",
|
|
return_value=auth_rejected,
|
|
),
|
|
patch.object(
|
|
bm.plus_api,
|
|
"initialize_ephemeral_trace_batch",
|
|
return_value=ephemeral_fail,
|
|
),
|
|
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
|
):
|
|
bm._initialize_backend_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
use_ephemeral=False,
|
|
)
|
|
|
|
assert bm.trace_batch_id is None
|
|
|
|
|
|
class TestMarkBatchAsFailedRouting:
|
|
"""Tests: _mark_batch_as_failed routes to the correct endpoint."""
|
|
|
|
def _make_batch_manager(self, ephemeral: bool = False):
|
|
with patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
|
return_value="mock_token",
|
|
):
|
|
bm = TraceBatchManager()
|
|
bm.is_current_batch_ephemeral = ephemeral
|
|
return bm
|
|
|
|
def test_routes_to_ephemeral_endpoint_when_ephemeral(self):
|
|
"""Ephemeral batches must use mark_ephemeral_trace_batch_as_failed."""
|
|
bm = self._make_batch_manager(ephemeral=True)
|
|
|
|
with patch.object(
|
|
bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
|
|
) as mock_ephemeral, patch.object(
|
|
bm.plus_api, "mark_trace_batch_as_failed"
|
|
) as mock_non_ephemeral:
|
|
bm._mark_batch_as_failed("batch-123", "some error")
|
|
|
|
mock_ephemeral.assert_called_once_with("batch-123", "some error")
|
|
mock_non_ephemeral.assert_not_called()
|
|
|
|
def test_routes_to_non_ephemeral_endpoint_when_not_ephemeral(self):
|
|
"""Non-ephemeral batches must use mark_trace_batch_as_failed."""
|
|
bm = self._make_batch_manager(ephemeral=False)
|
|
|
|
with patch.object(
|
|
bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
|
|
) as mock_ephemeral, patch.object(
|
|
bm.plus_api, "mark_trace_batch_as_failed"
|
|
) as mock_non_ephemeral:
|
|
bm._mark_batch_as_failed("batch-456", "another error")
|
|
|
|
mock_non_ephemeral.assert_called_once_with("batch-456", "another error")
|
|
mock_ephemeral.assert_not_called()
|
|
|
|
|
|
class TestBackendInitializedGatedOnSuccess:
|
|
"""Tests: backend_initialized reflects actual init success on non-first-time path."""
|
|
|
|
def test_backend_initialized_true_on_success(self):
|
|
"""backend_initialized is True when _initialize_backend_batch succeeds."""
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
|
|
return_value=False,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
|
return_value="mock_token",
|
|
),
|
|
):
|
|
bm = TraceBatchManager()
|
|
mock_response = MagicMock(
|
|
status_code=201,
|
|
json=MagicMock(return_value={"trace_id": "server-id"}),
|
|
)
|
|
with patch.object(
|
|
bm.plus_api, "initialize_trace_batch", return_value=mock_response
|
|
):
|
|
bm.initialize_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
)
|
|
|
|
assert bm.backend_initialized is True
|
|
assert bm.trace_batch_id == "server-id"
|
|
|
|
def test_backend_initialized_false_on_failure(self):
|
|
"""backend_initialized is False when _initialize_backend_batch fails."""
|
|
with (
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
|
|
return_value=False,
|
|
),
|
|
patch(
|
|
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
|
return_value="mock_token",
|
|
),
|
|
):
|
|
bm = TraceBatchManager()
|
|
with patch.object(
|
|
bm.plus_api, "initialize_trace_batch", return_value=None
|
|
):
|
|
bm.initialize_batch(
|
|
user_context={"privacy_level": "standard"},
|
|
execution_metadata={"execution_type": "crew"},
|
|
)
|
|
|
|
assert bm.backend_initialized is False
|
|
assert bm.trace_batch_id is None
|