diff --git a/src/crewai/crew.py b/src/crewai/crew.py index b930ebde4..6c4ce481e 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -286,10 +286,12 @@ class Crew(FlowTrackable, BaseModel): self._cache_handler = CacheHandler() event_listener = EventListener() - if on_first_execution_tracing_confirmation(): - self.tracing = True - if is_tracing_enabled() or self.tracing: + if ( + on_first_execution_tracing_confirmation() + or is_tracing_enabled() + or self.tracing + ): trace_listener = TraceCollectionListener() trace_listener.setup_listeners(crewai_event_bus) event_listener.verbose = self.verbose diff --git a/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py b/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py index c06d51af7..71d4417dd 100644 --- a/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py +++ b/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py @@ -40,6 +40,8 @@ class TraceBatch: class TraceBatchManager: """Single responsibility: Manage batches and event buffering""" + is_current_batch_ephemeral: bool = False + def __init__(self): try: self.plus_api = PlusAPI(api_key=get_auth_token()) @@ -62,6 +64,7 @@ class TraceBatchManager: user_context=user_context, execution_metadata=execution_metadata ) self.event_buffer.clear() + self.is_current_batch_ephemeral = use_ephemeral self.record_start_time("execution") self._initialize_backend_batch(user_context, execution_metadata, use_ephemeral) @@ -136,7 +139,7 @@ class TraceBatchManager: """Add event to buffer""" self.event_buffer.append(trace_event) - def _send_events_to_backend(self, ephemeral: bool = True): + def _send_events_to_backend(self): """Send buffered events to backend""" if not self.plus_api or not self.trace_batch_id or not self.event_buffer: return @@ -156,7 +159,7 @@ class TraceBatchManager: response = ( self.plus_api.send_ephemeral_trace_events(self.trace_batch_id, payload) - if ephemeral + if self.is_current_batch_ephemeral else self.plus_api.send_trace_events(self.trace_batch_id, payload) ) @@ -170,15 +173,14 @@ class TraceBatchManager: except Exception as e: logger.error(f"❌ Error sending events to backend: {str(e)}") - def finalize_batch(self, ephemeral: bool = True) -> Optional[TraceBatch]: + def finalize_batch(self) -> Optional[TraceBatch]: """Finalize batch and return it for sending""" if not self.current_batch: return None if self.event_buffer: - self._send_events_to_backend(ephemeral) - - self._finalize_backend_batch(ephemeral) + self._send_events_to_backend() + self._finalize_backend_batch() self.current_batch.events = self.event_buffer.copy() @@ -187,12 +189,13 @@ class TraceBatchManager: self.current_batch = None self.event_buffer.clear() self.trace_batch_id = None + self.is_current_batch_ephemeral = False self._cleanup_batch_data() return finalized_batch - def _finalize_backend_batch(self, ephemeral: bool = True): + def _finalize_backend_batch(self): """Send batch finalization to backend""" if not self.plus_api or not self.trace_batch_id: return @@ -210,7 +213,7 @@ class TraceBatchManager: self.plus_api.finalize_ephemeral_trace_batch( self.trace_batch_id, payload ) - if ephemeral + if self.is_current_batch_ephemeral else self.plus_api.finalize_trace_batch(self.trace_batch_id, payload) ) @@ -219,7 +222,7 @@ class TraceBatchManager: console = Console() return_link = ( f"{CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}" - if not ephemeral and access_code + if not self.is_current_batch_ephemeral and access_code is None else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}" ) panel = Panel( diff --git a/src/crewai/utilities/events/listeners/tracing/trace_listener.py b/src/crewai/utilities/events/listeners/tracing/trace_listener.py index eaa993c8f..1658a42f7 100644 --- a/src/crewai/utilities/events/listeners/tracing/trace_listener.py +++ b/src/crewai/utilities/events/listeners/tracing/trace_listener.py @@ -166,7 +166,7 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(CrewKickoffCompletedEvent) def on_crew_completed(source, event): self._handle_trace_event("crew_kickoff_completed", source, event) - self.batch_manager.finalize_batch(ephemeral=True) + self.batch_manager.finalize_batch() @event_bus.on(CrewKickoffFailedEvent) def on_crew_failed(source, event): diff --git a/tests/cassettes/TestTraceListenerSetup.test_trace_listener_ephemeral_batch.yaml b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_ephemeral_batch.yaml new file mode 100644 index 000000000..49b349b5d --- /dev/null +++ b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_ephemeral_batch.yaml @@ -0,0 +1,126 @@ +interactions: +- request: + body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour + personal goal is: Test goal\nTo give my best complete final answer to the task + respond using the exact following format:\n\nThought: I now can give a great + answer\nFinal Answer: Your final answer must be the great and the most complete + as possible, it must be outcome described.\n\nI MUST use these formats, my job + depends on it!"}, {"role": "user", "content": "\nCurrent Task: Say hello to + the world\n\nThis is the expected criteria for your final answer: hello world\nyou + MUST return the actual complete content as the final answer, not a summary.\n\nBegin! + This is VERY important to you, use the tools available and give your best Final + Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": + ["\nObservation:"]}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + connection: + - keep-alive + content-length: + - '825' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.93.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.93.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600.0' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.9 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFJdi9swEHz3r1j0HBc7ZydXvx1HC0evhT6UUtrDKNLa1lXWqpJ8aTjy + 34vsXOz0A/pi8M7OaGZ3nxMApiSrgImOB9Fbnd4Why/v79HeePeD37/Zfdx8evf5+rY4fNjdPbJV + ZNDuEUV4Yb0S1FuNQZGZYOGQB4yq+bYsr7J1nhcj0JNEHWmtDWlBaa+MStfZukizbZpfn9gdKYGe + VfA1AQB4Hr/Rp5H4k1WQrV4qPXrPW2TVuQmAOdKxwrj3ygduAlvNoCAT0IzW78DQHgQ30KonBA5t + tA3c+D06gG/mrTJcw834X0GHWhPsyWm5FHTYDJ7HUGbQegFwYyjwOJQxysMJOZ7Na2qto53/jcoa + ZZTvaofck4lGfSDLRvSYADyMQxoucjPrqLehDvQdx+fycjvpsXk3C/TqBAYKXC/q29NoL/VqiYEr + 7RdjZoKLDuVMnXfCB6loASSL1H+6+Zv2lFyZ9n/kZ0AItAFlbR1KJS4Tz20O4+n+q+085dEw8+ie + lMA6KHRxExIbPujpoJg/+IB93SjTorNOTVfV2LrcZLzZYFm+Zskx+QUAAP//AwB1vYZ+YwMAAA== + headers: + CF-RAY: + - 96fc9f29dea3cf1f-SJC + Connection: + - keep-alive + Content-Encoding: + - gzip + Content-Type: + - application/json + Date: + - Fri, 15 Aug 2025 23:55:15 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=oA9oTa3cE0ZaEUDRf0hCpnarSAQKzrVUhl6qDS4j09w-1755302115-1.0.1.1-gUUDl4ZqvBQkg7244DTwOmSiDUT2z_AiQu0P1xUaABjaufSpZuIlI5G0H7OSnW.ldypvpxjj45NGWesJ62M_2U7r20tHz_gMmDFw6D5ZiNc; + path=/; expires=Sat, 16-Aug-25 00:25:15 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + - _cfuvid=ICenEGMmOE5jaOjwD30bAOwrF8.XRbSIKTBl1EyWs0o-1755302115700-0.0.1.1-604800000; + path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - crewai-iuxna1 + openai-processing-ms: + - '735' + openai-project: + - proj_xitITlrFeen7zjNSzML82h9x + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '753' + x-ratelimit-limit-project-tokens: + - '150000000' + x-ratelimit-limit-requests: + - '30000' + x-ratelimit-limit-tokens: + - '150000000' + x-ratelimit-remaining-project-tokens: + - '149999830' + x-ratelimit-remaining-requests: + - '29999' + x-ratelimit-remaining-tokens: + - '149999827' + x-ratelimit-reset-project-tokens: + - 0s + x-ratelimit-reset-requests: + - 2ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_212fde9d945a462ba0d89ea856131dce + status: + code: 200 + message: OK +version: 1 diff --git a/tests/cassettes/TestTraceListenerSetup.test_trace_listener_with_authenticated_user.yaml b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_with_authenticated_user.yaml new file mode 100644 index 000000000..be0d8782a --- /dev/null +++ b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_with_authenticated_user.yaml @@ -0,0 +1,123 @@ +interactions: +- request: + body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour + personal goal is: Test goal\nTo give my best complete final answer to the task + respond using the exact following format:\n\nThought: I now can give a great + answer\nFinal Answer: Your final answer must be the great and the most complete + as possible, it must be outcome described.\n\nI MUST use these formats, my job + depends on it!"}, {"role": "user", "content": "\nCurrent Task: Say hello to + the world\n\nThis is the expected criteria for your final answer: hello world\nyou + MUST return the actual complete content as the final answer, not a summary.\n\nBegin! + This is VERY important to you, use the tools available and give your best Final + Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": + ["\nObservation:"]}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + connection: + - keep-alive + content-length: + - '825' + content-type: + - application/json + cookie: + - __cf_bm=oA9oTa3cE0ZaEUDRf0hCpnarSAQKzrVUhl6qDS4j09w-1755302115-1.0.1.1-gUUDl4ZqvBQkg7244DTwOmSiDUT2z_AiQu0P1xUaABjaufSpZuIlI5G0H7OSnW.ldypvpxjj45NGWesJ62M_2U7r20tHz_gMmDFw6D5ZiNc; + _cfuvid=ICenEGMmOE5jaOjwD30bAOwrF8.XRbSIKTBl1EyWs0o-1755302115700-0.0.1.1-604800000 + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.93.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.93.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600.0' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.9 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFLLbtswELzrKxY8W4VkW3ajW1GgQA7JJUGLog0EmlxJrCkuQ1J2g8D/ + XlByLLkPoBcB2tkZzuzuawLAlGQlMNHyIDqr04/rl6/3z+6hu8vq/cOX5936cRPE3i8Px88rtogM + 2v1AEd5Y7wR1VmNQZEZYOOQBo2q+LYpVtszzYgA6kqgjrbEhXVPaKaPSZbZcp9k2zd+f2S0pgZ6V + 8C0BAHgdvtGnkfiTlZAt3iodes8bZOWlCYA50rHCuPfKB24CW0ygIBPQDNZvwdARBDfQqAMChyba + Bm78ER3Ad/NJGa7hw/BfQotaExzJaTkXdFj3nsdQptd6BnBjKPA4lCHK0xk5Xcxraqyjnf+Nympl + lG8rh9yTiUZ9IMsG9JQAPA1D6q9yM+uos6EKtMfhubzYjnps2s0MXZ3BQIHrWX17Hu21XiUxcKX9 + bMxMcNGinKjTTngvFc2AZJb6Tzd/0x6TK9P8j/wECIE2oKysQ6nEdeKpzWE83X+1XaY8GGYe3UEJ + rIJCFzchsea9Hg+K+RcfsKtqZRp01qnxqmpbFZuM1xssihuWnJJfAAAA//8DAFSowWRjAwAA + headers: + CF-RAY: + - 96fc9f301bf7cf1f-SJC + Connection: + - keep-alive + Content-Encoding: + - gzip + Content-Type: + - application/json + Date: + - Fri, 15 Aug 2025 23:55:16 GMT + Server: + - cloudflare + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - crewai-iuxna1 + openai-processing-ms: + - '685' + openai-project: + - proj_xitITlrFeen7zjNSzML82h9x + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '711' + x-ratelimit-limit-project-tokens: + - '150000000' + x-ratelimit-limit-requests: + - '30000' + x-ratelimit-limit-tokens: + - '150000000' + x-ratelimit-remaining-project-tokens: + - '149999827' + x-ratelimit-remaining-requests: + - '29999' + x-ratelimit-remaining-tokens: + - '149999827' + x-ratelimit-reset-project-tokens: + - 0s + x-ratelimit-reset-requests: + - 2ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_3f0ec42447374a76a22a4cdb9f336279 + status: + code: 200 + message: OK +version: 1 diff --git a/tests/tracing/test_tracing.py b/tests/tracing/test_tracing.py index 62ef56c8b..3a14dc683 100644 --- a/tests/tracing/test_tracing.py +++ b/tests/tracing/test_tracing.py @@ -2,6 +2,7 @@ import os import pytest from unittest.mock import patch, MagicMock + from crewai import Agent, Task, Crew from crewai.flow.flow import Flow, start from crewai.utilities.events.listeners.tracing.trace_listener import ( @@ -321,6 +322,74 @@ class TestTraceListenerSetup: FlowExample() assert mock_listener_setup.call_count >= 1 + @pytest.mark.vcr(filter_headers=["authorization"]) + def test_trace_listener_ephemeral_batch(self): + """Test that trace listener properly handles ephemeral batches""" + with ( + patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}), + patch( + "crewai.utilities.events.listeners.tracing.trace_listener.TraceCollectionListener._check_authenticated", + return_value=False, + ), + ): + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + llm="gpt-4o-mini", + ) + task = Task( + description="Say hello to the world", + expected_output="hello world", + agent=agent, + ) + crew = Crew(agents=[agent], tasks=[task], tracing=True) + + with patch.object(TraceBatchManager, "initialize_batch") as mock_initialize: + crew.kickoff() + + assert mock_initialize.call_count >= 1 + assert mock_initialize.call_args_list[0][1]["use_ephemeral"] is True + + @pytest.mark.vcr(filter_headers=["authorization"]) + def test_trace_listener_with_authenticated_user(self): + """Test that trace listener properly handles authenticated batches""" + with ( + patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}), + patch( + "crewai.utilities.events.listeners.tracing.trace_batch_manager.PlusAPI" + ) as mock_plus_api_class, + ): + mock_plus_api_instance = MagicMock() + mock_plus_api_class.return_value = mock_plus_api_instance + + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + llm="gpt-4o-mini", + ) + task = Task( + description="Say hello to the world", + expected_output="hello world", + agent=agent, + ) + + with ( + patch.object(TraceBatchManager, "initialize_batch") as mock_initialize, + patch.object( + TraceBatchManager, "finalize_batch" + ) as mock_finalize_backend_batch, + ): + crew = Crew(agents=[agent], tasks=[task], tracing=True) + crew.kickoff() + + mock_plus_api_class.assert_called_with(api_key="mock_token_12345") + + assert mock_initialize.call_count >= 1 + mock_finalize_backend_batch.assert_called_with() + assert mock_finalize_backend_batch.call_count >= 1 + # Helper method to ensure cleanup def teardown_method(self): """Cleanup after each test method"""