From a1b3edd79c4b6f8686e36245101bfeb7797760f9 Mon Sep 17 00:00:00 2001 From: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Date: Mon, 18 Aug 2025 14:16:51 -0700 Subject: [PATCH] =?UTF-8?q?Refactor=20tracing=20logic=20to=20consolidate?= =?UTF-8?q?=20conditions=20for=20enabling=20tracing=E2=80=A6=20(#3347)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refactor tracing logic to consolidate conditions for enabling tracing in Crew class and update TraceBatchManager to handle ephemeral batches more effectively. Added tests for trace listener handling of both ephemeral and authenticated user batches. * drop print * linted * refactor: streamline ephemeral handling in TraceBatchManager This commit removes the ephemeral parameter from the _send_events_to_backend and _finalize_backend_batch methods, replacing it with internal logic that checks the current batch's ephemeral status. This change simplifies the method signatures and enhances the clarity of the code by directly using the is_current_batch_ephemeral attribute for conditional logic. --- src/crewai/crew.py | 8 +- .../listeners/tracing/trace_batch_manager.py | 21 +-- .../listeners/tracing/trace_listener.py | 2 +- ...p.test_trace_listener_ephemeral_batch.yaml | 126 ++++++++++++++++++ ...race_listener_with_authenticated_user.yaml | 123 +++++++++++++++++ tests/tracing/test_tracing.py | 69 ++++++++++ 6 files changed, 336 insertions(+), 13 deletions(-) create mode 100644 tests/cassettes/TestTraceListenerSetup.test_trace_listener_ephemeral_batch.yaml create mode 100644 tests/cassettes/TestTraceListenerSetup.test_trace_listener_with_authenticated_user.yaml diff --git a/src/crewai/crew.py b/src/crewai/crew.py index b930ebde4..6c4ce481e 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -286,10 +286,12 @@ class Crew(FlowTrackable, BaseModel): self._cache_handler = CacheHandler() event_listener = EventListener() - if on_first_execution_tracing_confirmation(): - self.tracing = True - if is_tracing_enabled() or self.tracing: + if ( + on_first_execution_tracing_confirmation() + or is_tracing_enabled() + or self.tracing + ): trace_listener = TraceCollectionListener() trace_listener.setup_listeners(crewai_event_bus) event_listener.verbose = self.verbose diff --git a/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py b/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py index c06d51af7..71d4417dd 100644 --- a/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py +++ b/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py @@ -40,6 +40,8 @@ class TraceBatch: class TraceBatchManager: """Single responsibility: Manage batches and event buffering""" + is_current_batch_ephemeral: bool = False + def __init__(self): try: self.plus_api = PlusAPI(api_key=get_auth_token()) @@ -62,6 +64,7 @@ class TraceBatchManager: user_context=user_context, execution_metadata=execution_metadata ) self.event_buffer.clear() + self.is_current_batch_ephemeral = use_ephemeral self.record_start_time("execution") self._initialize_backend_batch(user_context, execution_metadata, use_ephemeral) @@ -136,7 +139,7 @@ class TraceBatchManager: """Add event to buffer""" self.event_buffer.append(trace_event) - def _send_events_to_backend(self, ephemeral: bool = True): + def _send_events_to_backend(self): """Send buffered events to backend""" if not self.plus_api or not self.trace_batch_id or not self.event_buffer: return @@ -156,7 +159,7 @@ class TraceBatchManager: response = ( self.plus_api.send_ephemeral_trace_events(self.trace_batch_id, payload) - if ephemeral + if self.is_current_batch_ephemeral else self.plus_api.send_trace_events(self.trace_batch_id, payload) ) @@ -170,15 +173,14 @@ class TraceBatchManager: except Exception as e: logger.error(f"❌ Error sending events to backend: {str(e)}") - def finalize_batch(self, ephemeral: bool = True) -> Optional[TraceBatch]: + def finalize_batch(self) -> Optional[TraceBatch]: """Finalize batch and return it for sending""" if not self.current_batch: return None if self.event_buffer: - self._send_events_to_backend(ephemeral) - - self._finalize_backend_batch(ephemeral) + self._send_events_to_backend() + self._finalize_backend_batch() self.current_batch.events = self.event_buffer.copy() @@ -187,12 +189,13 @@ class TraceBatchManager: self.current_batch = None self.event_buffer.clear() self.trace_batch_id = None + self.is_current_batch_ephemeral = False self._cleanup_batch_data() return finalized_batch - def _finalize_backend_batch(self, ephemeral: bool = True): + def _finalize_backend_batch(self): """Send batch finalization to backend""" if not self.plus_api or not self.trace_batch_id: return @@ -210,7 +213,7 @@ class TraceBatchManager: self.plus_api.finalize_ephemeral_trace_batch( self.trace_batch_id, payload ) - if ephemeral + if self.is_current_batch_ephemeral else self.plus_api.finalize_trace_batch(self.trace_batch_id, payload) ) @@ -219,7 +222,7 @@ class TraceBatchManager: console = Console() return_link = ( f"{CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}" - if not ephemeral and access_code + if not self.is_current_batch_ephemeral and access_code is None else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}" ) panel = Panel( diff --git a/src/crewai/utilities/events/listeners/tracing/trace_listener.py b/src/crewai/utilities/events/listeners/tracing/trace_listener.py index eaa993c8f..1658a42f7 100644 --- a/src/crewai/utilities/events/listeners/tracing/trace_listener.py +++ b/src/crewai/utilities/events/listeners/tracing/trace_listener.py @@ -166,7 +166,7 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(CrewKickoffCompletedEvent) def on_crew_completed(source, event): self._handle_trace_event("crew_kickoff_completed", source, event) - self.batch_manager.finalize_batch(ephemeral=True) + self.batch_manager.finalize_batch() @event_bus.on(CrewKickoffFailedEvent) def on_crew_failed(source, event): diff --git a/tests/cassettes/TestTraceListenerSetup.test_trace_listener_ephemeral_batch.yaml b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_ephemeral_batch.yaml new file mode 100644 index 000000000..49b349b5d --- /dev/null +++ b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_ephemeral_batch.yaml @@ -0,0 +1,126 @@ +interactions: +- request: + body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour + personal goal is: Test goal\nTo give my best complete final answer to the task + respond using the exact following format:\n\nThought: I now can give a great + answer\nFinal Answer: Your final answer must be the great and the most complete + as possible, it must be outcome described.\n\nI MUST use these formats, my job + depends on it!"}, {"role": "user", "content": "\nCurrent Task: Say hello to + the world\n\nThis is the expected criteria for your final answer: hello world\nyou + MUST return the actual complete content as the final answer, not a summary.\n\nBegin! + This is VERY important to you, use the tools available and give your best Final + Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": + ["\nObservation:"]}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + connection: + - keep-alive + content-length: + - '825' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.93.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.93.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600.0' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.9 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFJdi9swEHz3r1j0HBc7ZydXvx1HC0evhT6UUtrDKNLa1lXWqpJ8aTjy + 34vsXOz0A/pi8M7OaGZ3nxMApiSrgImOB9Fbnd4Why/v79HeePeD37/Zfdx8evf5+rY4fNjdPbJV + ZNDuEUV4Yb0S1FuNQZGZYOGQB4yq+bYsr7J1nhcj0JNEHWmtDWlBaa+MStfZukizbZpfn9gdKYGe + VfA1AQB4Hr/Rp5H4k1WQrV4qPXrPW2TVuQmAOdKxwrj3ygduAlvNoCAT0IzW78DQHgQ30KonBA5t + tA3c+D06gG/mrTJcw834X0GHWhPsyWm5FHTYDJ7HUGbQegFwYyjwOJQxysMJOZ7Na2qto53/jcoa + ZZTvaofck4lGfSDLRvSYADyMQxoucjPrqLehDvQdx+fycjvpsXk3C/TqBAYKXC/q29NoL/VqiYEr + 7RdjZoKLDuVMnXfCB6loASSL1H+6+Zv2lFyZ9n/kZ0AItAFlbR1KJS4Tz20O4+n+q+085dEw8+ie + lMA6KHRxExIbPujpoJg/+IB93SjTorNOTVfV2LrcZLzZYFm+Zskx+QUAAP//AwB1vYZ+YwMAAA== + headers: + CF-RAY: + - 96fc9f29dea3cf1f-SJC + Connection: + - keep-alive + Content-Encoding: + - gzip + Content-Type: + - application/json + Date: + - Fri, 15 Aug 2025 23:55:15 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=oA9oTa3cE0ZaEUDRf0hCpnarSAQKzrVUhl6qDS4j09w-1755302115-1.0.1.1-gUUDl4ZqvBQkg7244DTwOmSiDUT2z_AiQu0P1xUaABjaufSpZuIlI5G0H7OSnW.ldypvpxjj45NGWesJ62M_2U7r20tHz_gMmDFw6D5ZiNc; + path=/; expires=Sat, 16-Aug-25 00:25:15 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + - _cfuvid=ICenEGMmOE5jaOjwD30bAOwrF8.XRbSIKTBl1EyWs0o-1755302115700-0.0.1.1-604800000; + path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - crewai-iuxna1 + openai-processing-ms: + - '735' + openai-project: + - proj_xitITlrFeen7zjNSzML82h9x + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '753' + x-ratelimit-limit-project-tokens: + - '150000000' + x-ratelimit-limit-requests: + - '30000' + x-ratelimit-limit-tokens: + - '150000000' + x-ratelimit-remaining-project-tokens: + - '149999830' + x-ratelimit-remaining-requests: + - '29999' + x-ratelimit-remaining-tokens: + - '149999827' + x-ratelimit-reset-project-tokens: + - 0s + x-ratelimit-reset-requests: + - 2ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_212fde9d945a462ba0d89ea856131dce + status: + code: 200 + message: OK +version: 1 diff --git a/tests/cassettes/TestTraceListenerSetup.test_trace_listener_with_authenticated_user.yaml b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_with_authenticated_user.yaml new file mode 100644 index 000000000..be0d8782a --- /dev/null +++ b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_with_authenticated_user.yaml @@ -0,0 +1,123 @@ +interactions: +- request: + body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour + personal goal is: Test goal\nTo give my best complete final answer to the task + respond using the exact following format:\n\nThought: I now can give a great + answer\nFinal Answer: Your final answer must be the great and the most complete + as possible, it must be outcome described.\n\nI MUST use these formats, my job + depends on it!"}, {"role": "user", "content": "\nCurrent Task: Say hello to + the world\n\nThis is the expected criteria for your final answer: hello world\nyou + MUST return the actual complete content as the final answer, not a summary.\n\nBegin! + This is VERY important to you, use the tools available and give your best Final + Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": + ["\nObservation:"]}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + connection: + - keep-alive + content-length: + - '825' + content-type: + - application/json + cookie: + - __cf_bm=oA9oTa3cE0ZaEUDRf0hCpnarSAQKzrVUhl6qDS4j09w-1755302115-1.0.1.1-gUUDl4ZqvBQkg7244DTwOmSiDUT2z_AiQu0P1xUaABjaufSpZuIlI5G0H7OSnW.ldypvpxjj45NGWesJ62M_2U7r20tHz_gMmDFw6D5ZiNc; + _cfuvid=ICenEGMmOE5jaOjwD30bAOwrF8.XRbSIKTBl1EyWs0o-1755302115700-0.0.1.1-604800000 + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.93.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.93.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600.0' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.9 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFLLbtswELzrKxY8W4VkW3ajW1GgQA7JJUGLog0EmlxJrCkuQ1J2g8D/ + XlByLLkPoBcB2tkZzuzuawLAlGQlMNHyIDqr04/rl6/3z+6hu8vq/cOX5936cRPE3i8Px88rtogM + 2v1AEd5Y7wR1VmNQZEZYOOQBo2q+LYpVtszzYgA6kqgjrbEhXVPaKaPSZbZcp9k2zd+f2S0pgZ6V + 8C0BAHgdvtGnkfiTlZAt3iodes8bZOWlCYA50rHCuPfKB24CW0ygIBPQDNZvwdARBDfQqAMChyba + Bm78ER3Ad/NJGa7hw/BfQotaExzJaTkXdFj3nsdQptd6BnBjKPA4lCHK0xk5Xcxraqyjnf+Nympl + lG8rh9yTiUZ9IMsG9JQAPA1D6q9yM+uos6EKtMfhubzYjnps2s0MXZ3BQIHrWX17Hu21XiUxcKX9 + bMxMcNGinKjTTngvFc2AZJb6Tzd/0x6TK9P8j/wECIE2oKysQ6nEdeKpzWE83X+1XaY8GGYe3UEJ + rIJCFzchsea9Hg+K+RcfsKtqZRp01qnxqmpbFZuM1xssihuWnJJfAAAA//8DAFSowWRjAwAA + headers: + CF-RAY: + - 96fc9f301bf7cf1f-SJC + Connection: + - keep-alive + Content-Encoding: + - gzip + Content-Type: + - application/json + Date: + - Fri, 15 Aug 2025 23:55:16 GMT + Server: + - cloudflare + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - crewai-iuxna1 + openai-processing-ms: + - '685' + openai-project: + - proj_xitITlrFeen7zjNSzML82h9x + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '711' + x-ratelimit-limit-project-tokens: + - '150000000' + x-ratelimit-limit-requests: + - '30000' + x-ratelimit-limit-tokens: + - '150000000' + x-ratelimit-remaining-project-tokens: + - '149999827' + x-ratelimit-remaining-requests: + - '29999' + x-ratelimit-remaining-tokens: + - '149999827' + x-ratelimit-reset-project-tokens: + - 0s + x-ratelimit-reset-requests: + - 2ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_3f0ec42447374a76a22a4cdb9f336279 + status: + code: 200 + message: OK +version: 1 diff --git a/tests/tracing/test_tracing.py b/tests/tracing/test_tracing.py index 62ef56c8b..3a14dc683 100644 --- a/tests/tracing/test_tracing.py +++ b/tests/tracing/test_tracing.py @@ -2,6 +2,7 @@ import os import pytest from unittest.mock import patch, MagicMock + from crewai import Agent, Task, Crew from crewai.flow.flow import Flow, start from crewai.utilities.events.listeners.tracing.trace_listener import ( @@ -321,6 +322,74 @@ class TestTraceListenerSetup: FlowExample() assert mock_listener_setup.call_count >= 1 + @pytest.mark.vcr(filter_headers=["authorization"]) + def test_trace_listener_ephemeral_batch(self): + """Test that trace listener properly handles ephemeral batches""" + with ( + patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}), + patch( + "crewai.utilities.events.listeners.tracing.trace_listener.TraceCollectionListener._check_authenticated", + return_value=False, + ), + ): + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + llm="gpt-4o-mini", + ) + task = Task( + description="Say hello to the world", + expected_output="hello world", + agent=agent, + ) + crew = Crew(agents=[agent], tasks=[task], tracing=True) + + with patch.object(TraceBatchManager, "initialize_batch") as mock_initialize: + crew.kickoff() + + assert mock_initialize.call_count >= 1 + assert mock_initialize.call_args_list[0][1]["use_ephemeral"] is True + + @pytest.mark.vcr(filter_headers=["authorization"]) + def test_trace_listener_with_authenticated_user(self): + """Test that trace listener properly handles authenticated batches""" + with ( + patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}), + patch( + "crewai.utilities.events.listeners.tracing.trace_batch_manager.PlusAPI" + ) as mock_plus_api_class, + ): + mock_plus_api_instance = MagicMock() + mock_plus_api_class.return_value = mock_plus_api_instance + + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + llm="gpt-4o-mini", + ) + task = Task( + description="Say hello to the world", + expected_output="hello world", + agent=agent, + ) + + with ( + patch.object(TraceBatchManager, "initialize_batch") as mock_initialize, + patch.object( + TraceBatchManager, "finalize_batch" + ) as mock_finalize_backend_batch, + ): + crew = Crew(agents=[agent], tasks=[task], tracing=True) + crew.kickoff() + + mock_plus_api_class.assert_called_with(api_key="mock_token_12345") + + assert mock_initialize.call_count >= 1 + mock_finalize_backend_batch.assert_called_with() + assert mock_finalize_backend_batch.call_count >= 1 + # Helper method to ensure cleanup def teardown_method(self): """Cleanup after each test method"""