From 251ae00b8ba07ba8f1aed268c5e54c87b695ce81 Mon Sep 17 00:00:00 2001 From: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Date: Fri, 8 Aug 2025 13:42:25 -0700 Subject: [PATCH] Lorenze/tracing-improvements-cleanup (#3291) * feat: add tracing support to Crew and Flow classes - Introduced a new `tracing` optional field in both the `Crew` and `Flow` classes to enable tracing functionality. - Updated the initialization logic to conditionally set up the `TraceCollectionListener` based on the `tracing` flag or the `CREWAI_TRACING_ENABLED` environment variable. - Removed the obsolete `interfaces.py` file related to tracing. - Enhanced the `TraceCollectionListener` to accept a `tracing` parameter and adjusted its internal logic accordingly. - Added tests to verify the correct setup of the trace listener when tracing is enabled. This change improves the observability of the crew execution process and allows for better debugging and performance monitoring. * fix flow name * refactor: replace _send_batch method with finalize_batch calls in TraceCollectionListener - Updated the TraceCollectionListener to use the batch_manager's finalize_batch method instead of the deprecated _send_batch method for handling trace events. - This change improves the clarity of the code and ensures that batch finalization is consistently managed through the batch manager. - Removed the obsolete _send_batch method to streamline the listener's functionality. * removed comments * refactor: enhance tracing functionality by introducing utility for tracing checks - Added a new utility function `is_tracing_enabled` to streamline the logic for checking if tracing is enabled based on the `CREWAI_TRACING_ENABLED` environment variable. - Updated the `Crew` and `Flow` classes to utilize this utility for improved readability and maintainability. - Refactored the `TraceCollectionListener` to simplify tracing checks and ensure consistent behavior across components. - Introduced a new module for tracing utilities to encapsulate related functions, enhancing code organization. * refactor: remove unused imports from crew and flow modules - Removed unnecessary `os` imports from both `crew.py` and `flow.py` files to enhance code cleanliness and maintainability. --- src/crewai/crew.py | 10 +- src/crewai/flow/flow.py | 9 +- .../events/listeners/tracing/__init__.py | 0 .../events/listeners/tracing/interfaces.py | 33 ----- .../listeners/tracing/trace_batch_manager.py | 5 +- .../listeners/tracing/trace_listener.py | 44 +++--- .../events/listeners/tracing/utils.py | 5 + ...ner_setup_correctly_with_tracing_flag.yaml | 125 ++++++++++++++++++ tests/tracing/test_tracing.py | 20 ++- 9 files changed, 176 insertions(+), 75 deletions(-) create mode 100644 src/crewai/utilities/events/listeners/tracing/__init__.py delete mode 100644 src/crewai/utilities/events/listeners/tracing/interfaces.py create mode 100644 src/crewai/utilities/events/listeners/tracing/utils.py create mode 100644 tests/cassettes/TestTraceListenerSetup.test_trace_listener_setup_correctly_with_tracing_flag.yaml diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 3cac711fa..5e654463c 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -1,4 +1,3 @@ -import os import asyncio import json import re @@ -78,6 +77,7 @@ from crewai.utilities.events.listeners.tracing.trace_listener import ( ) +from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled from crewai.utilities.formatter import ( aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_tasks, @@ -248,6 +248,10 @@ class Crew(FlowTrackable, BaseModel): default=None, description="Metrics for the LLM usage during all tasks execution.", ) + tracing: Optional[bool] = Field( + default=False, + description="Whether to enable tracing for the crew.", + ) @field_validator("id", mode="before") @classmethod @@ -279,8 +283,8 @@ class Crew(FlowTrackable, BaseModel): self._cache_handler = CacheHandler() event_listener = EventListener() - if os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true": - trace_listener = TraceCollectionListener() + if is_tracing_enabled() or self.tracing: + trace_listener = TraceCollectionListener(tracing=self.tracing) trace_listener.setup_listeners(crewai_event_bus) event_listener.verbose = self.verbose event_listener.formatter.verbose = self.verbose diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 9d75dfe5a..0c145b401 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -2,7 +2,6 @@ import asyncio import copy import inspect import logging -import os from typing import ( Any, Callable, @@ -36,6 +35,7 @@ from crewai.utilities.events.flow_events import ( from crewai.utilities.events.listeners.tracing.trace_listener import ( TraceCollectionListener, ) +from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled from crewai.utilities.printer import Printer logger = logging.getLogger(__name__) @@ -441,6 +441,7 @@ class Flow(Generic[T], metaclass=FlowMeta): _router_paths: Dict[str, List[str]] = {} initial_state: Union[Type[T], T, None] = None name: Optional[str] = None + tracing: Optional[bool] = False def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]: class _FlowGeneric(cls): # type: ignore @@ -452,6 +453,7 @@ class Flow(Generic[T], metaclass=FlowMeta): def __init__( self, persistence: Optional[FlowPersistence] = None, + tracing: Optional[bool] = False, **kwargs: Any, ) -> None: """Initialize a new Flow instance. @@ -469,8 +471,9 @@ class Flow(Generic[T], metaclass=FlowMeta): # Initialize state with initial values self._state = self._create_initial_state() - if os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true": - trace_listener = TraceCollectionListener() + self.tracing = tracing + if is_tracing_enabled() or tracing: + trace_listener = TraceCollectionListener(tracing=tracing) trace_listener.setup_listeners(crewai_event_bus) # Apply any additional kwargs if kwargs: diff --git a/src/crewai/utilities/events/listeners/tracing/__init__.py b/src/crewai/utilities/events/listeners/tracing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/crewai/utilities/events/listeners/tracing/interfaces.py b/src/crewai/utilities/events/listeners/tracing/interfaces.py deleted file mode 100644 index 4516be7f8..000000000 --- a/src/crewai/utilities/events/listeners/tracing/interfaces.py +++ /dev/null @@ -1,33 +0,0 @@ -import json -from datetime import datetime - -from crewai.cli.plus_api import PlusAPI -from crewai.cli.authentication.token import get_auth_token -from pydantic import BaseModel -from .trace_batch_manager import TraceBatch -from logging import getLogger - -logger = getLogger(__name__) - - -class TraceSender(BaseModel): - """Trace sender for sending trace batches to the backend""" - - def send_batch(self, batch: TraceBatch) -> bool: - """Print trace batch to console""" - try: - payload = batch.to_dict() - - def datetime_handler(obj): - if isinstance(obj, datetime): - return obj.isoformat() - - serialized_payload = json.loads( - json.dumps(payload, default=datetime_handler) - ) - - PlusAPI(api_key=get_auth_token()).send_trace_batch(serialized_payload) - return True - except Exception as e: - logger.error(f"Error sending trace batch: {e}") - return False diff --git a/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py b/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py index d014c65d2..4a1f06621 100644 --- a/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py +++ b/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py @@ -66,6 +66,7 @@ class TraceBatchManager: self, user_context: Dict[str, str], execution_metadata: Dict[str, Any] ): """Send batch initialization to backend""" + if not self.plus_api or not self.current_batch: return @@ -75,8 +76,8 @@ class TraceBatchManager: "execution_type": execution_metadata.get("execution_type", "crew"), "execution_context": { "crew_fingerprint": execution_metadata.get("crew_fingerprint"), - "crew_name": execution_metadata.get("crew_name", "Unknown Crew"), - "flow_name": execution_metadata.get("flow_name", "Unknown Flow"), + "crew_name": execution_metadata.get("crew_name", None), + "flow_name": execution_metadata.get("flow_name", None), "crewai_version": self.current_batch.version, "privacy_level": user_context.get("privacy_level", "standard"), }, diff --git a/src/crewai/utilities/events/listeners/tracing/trace_listener.py b/src/crewai/utilities/events/listeners/tracing/trace_listener.py index de262ff5b..e8d68ccae 100644 --- a/src/crewai/utilities/events/listeners/tracing/trace_listener.py +++ b/src/crewai/utilities/events/listeners/tracing/trace_listener.py @@ -13,6 +13,7 @@ from crewai.utilities.events.agent_events import ( AgentExecutionErrorEvent, ) from crewai.utilities.events.listeners.tracing.types import TraceEvent +from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled from crewai.utilities.events.reasoning_events import ( AgentReasoningStartedEvent, AgentReasoningCompletedEvent, @@ -65,7 +66,7 @@ from crewai.utilities.events.memory_events import ( MemorySaveCompletedEvent, MemorySaveFailedEvent, ) -from .interfaces import TraceSender + from crewai.cli.authentication.token import get_auth_token from crewai.cli.version import get_crewai_version @@ -75,13 +76,13 @@ class TraceCollectionListener(BaseEventListener): Trace collection listener that orchestrates trace collection """ - trace_enabled: bool = False + trace_enabled: Optional[bool] = False complex_events = ["task_started", "llm_call_started", "llm_call_completed"] _instance = None _initialized = False - def __new__(cls, batch_manager=None, trace_sender=None): + def __new__(cls, batch_manager=None, tracing: Optional[bool] = False): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance @@ -89,14 +90,14 @@ class TraceCollectionListener(BaseEventListener): def __init__( self, batch_manager: Optional[TraceBatchManager] = None, - trace_sender: Optional[TraceSender] = None, + tracing: Optional[bool] = False, ): if self._initialized: return super().__init__() self.batch_manager = batch_manager or TraceBatchManager() - self.trace_sender = trace_sender or TraceSender() + self.tracing = tracing or False self.trace_enabled = self._check_trace_enabled() self._initialized = True @@ -106,9 +107,7 @@ class TraceCollectionListener(BaseEventListener): if not auth_token: return False - return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true" or bool( - os.getenv("CREWAI_USER_TOKEN") - ) + return is_tracing_enabled() or self.tracing def _get_user_context(self) -> Dict[str, str]: """Extract user context for tracing""" @@ -156,7 +155,7 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(FlowFinishedEvent) def on_flow_finished(source, event): self._handle_trace_event("flow_finished", source, event) - self._send_batch() + self.batch_manager.finalize_batch() @event_bus.on(FlowPlotEvent) def on_flow_plot(source, event): @@ -174,12 +173,12 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(CrewKickoffCompletedEvent) def on_crew_completed(source, event): self._handle_trace_event("crew_kickoff_completed", source, event) - self._send_batch() + self.batch_manager.finalize_batch() @event_bus.on(CrewKickoffFailedEvent) def on_crew_failed(source, event): self._handle_trace_event("crew_kickoff_failed", source, event) - self._send_batch() + self.batch_manager.finalize_batch() @event_bus.on(TaskStartedEvent) def on_task_started(source, event): @@ -303,7 +302,7 @@ class TraceCollectionListener(BaseEventListener): """Initialize trace batch for Flow execution""" user_context = self._get_user_context() execution_metadata = { - "flow_name": getattr(source, "__class__.__name__", "Unknown Flow"), + "flow_name": getattr(event, "flow_name", "Unknown Flow"), "execution_start": event.timestamp if hasattr(event, "timestamp") else None, "crewai_version": get_crewai_version(), "execution_type": "flow", @@ -332,14 +331,6 @@ class TraceCollectionListener(BaseEventListener): trace_event = self._create_trace_event(event_type, source, event) self.batch_manager.add_event(trace_event) - def _send_batch(self): - """Send finalized batch using the configured sender""" - batch = self.batch_manager.finalize_batch() - if batch: - success = self.trace_sender.send_batch(batch) - if not success: - print("⚠️ Failed to send trace batch") - def _create_trace_event( self, event_type: str, source: Any, event: Any ) -> TraceEvent: @@ -360,20 +351,15 @@ class TraceCollectionListener(BaseEventListener): elif event_type == "task_started": return { "task_description": event.task.description, + "expected_output": event.task.expected_output, "task_name": event.task.name, "context": event.context, "agent": source.agent.role, } elif event_type == "llm_call_started": - return { - **self._safe_serialize_to_dict(event), - "messages": self._truncate_messages(event.messages), - } + return self._safe_serialize_to_dict(event) elif event_type == "llm_call_completed": - return { - **self._safe_serialize_to_dict(event), - "messages": self._truncate_messages(event.messages), - } + return self._safe_serialize_to_dict(event) else: return { "event_type": event_type, @@ -396,7 +382,7 @@ class TraceCollectionListener(BaseEventListener): return {"serialization_error": str(e), "object_type": type(obj).__name__} # TODO: move to utils - def _truncate_messages(self, messages, max_content_length=200, max_messages=5): + def _truncate_messages(self, messages, max_content_length=500, max_messages=5): """Truncate message content and limit number of messages""" if not messages or not isinstance(messages, list): return messages diff --git a/src/crewai/utilities/events/listeners/tracing/utils.py b/src/crewai/utilities/events/listeners/tracing/utils.py new file mode 100644 index 000000000..e498bc4f9 --- /dev/null +++ b/src/crewai/utilities/events/listeners/tracing/utils.py @@ -0,0 +1,5 @@ +import os + + +def is_tracing_enabled() -> bool: + return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true" diff --git a/tests/cassettes/TestTraceListenerSetup.test_trace_listener_setup_correctly_with_tracing_flag.yaml b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_setup_correctly_with_tracing_flag.yaml new file mode 100644 index 000000000..84f8dfa2a --- /dev/null +++ b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_setup_correctly_with_tracing_flag.yaml @@ -0,0 +1,125 @@ +interactions: +- request: + body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour + personal goal is: Test goal\nTo give my best complete final answer to the task + respond using the exact following format:\n\nThought: I now can give a great + answer\nFinal Answer: Your final answer must be the great and the most complete + as possible, it must be outcome described.\n\nI MUST use these formats, my job + depends on it!"}, {"role": "user", "content": "\nCurrent Task: Say hello to + the world\n\nThis is the expected criteria for your final answer: hello world\nyou + MUST return the actual complete content as the final answer, not a summary.\n\nBegin! + This is VERY important to you, use the tools available and give your best Final + Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o", "stop": ["\nObservation:"]}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + connection: + - keep-alive + content-length: + - '820' + content-type: + - application/json + cookie: + - _cfuvid=NaXWifUGChHp6Ap1mvfMrNzmO4HdzddrqXkSR9T.hYo-1754508545647-0.0.1.1-604800000 + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.93.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.93.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600.0' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.9 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFJdj9MwEHzPr1j5uUFp6DUhbycE4g6eKEic4BS59ibx4Xgt27mCTv3v + yEmvSfmQeImUnZ3xzO4+JQBMSVYBEx0Porc6fZ3nH2725cc3O1Pevnv/+e5OPFx/2mW7wd5+YavI + oP0DivDMeiGotxqDIjPBwiEPGFXXxdVmm623r7Yj0JNEHWmtDemG0jzLN2lWptn2ROxICfSsgq8J + AMDT+I0WjcQfrIJs9Vzp0XveIqvOTQDMkY4Vxr1XPnAT2GoGBZmAZnR9A4YOILiBVj0icGijY+DG + H9ABfDNvleEarsf/CjrUmuBATsuloMNm8DzmMYPWC4AbQ4HHeYxR7k/I8WxeU2sd7f1vVNYoo3xX + O+SeTDTqA1k2oscE4H4c0nCRm1lHvQ11oO84Pre+KiY9Nq9lgb48gYEC14t6cRrtpV4tMXCl/WLM + THDRoZyp8074IBUtgGSR+k83f9OekivT/o/8DAiBNqCsrUOpxGXiuc1hvNp/tZ2nPBpmHt2jElgH + hS5uQmLDBz0dFPM/fcC+bpRp0VmnpqtqbJ0VZbHGnMuSJcfkFwAAAP//AwBXeOIeXgMAAA== + headers: + CF-RAY: + - 96b9d31b89dc1684-SJC + Connection: + - keep-alive + Content-Encoding: + - gzip + Content-Type: + - application/json + Date: + - Thu, 07 Aug 2025 21:21:37 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=nQuY9yOahy.xg.aHkxwJgC5gyX5c9Xjbhp3Y7GMX4Ek-1754601697-1.0.1.1-_K22zHDSq5PrNEgK7qwpgcjPitPpgoT54GksNiq6j.aSPasbC7UakO3AYT59smUo5j14NY_OrHkDhm.eGIdpUTpnoJZK7MfR7X8Z96FITGs; + path=/; expires=Thu, 07-Aug-25 21:51:37 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - crewai-iuxna1 + openai-processing-ms: + - '424' + openai-project: + - proj_xitITlrFeen7zjNSzML82h9x + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '438' + x-ratelimit-limit-project-tokens: + - '30000000' + x-ratelimit-limit-requests: + - '10000' + x-ratelimit-limit-tokens: + - '30000000' + x-ratelimit-remaining-project-tokens: + - '29999828' + x-ratelimit-remaining-requests: + - '9999' + x-ratelimit-remaining-tokens: + - '29999828' + x-ratelimit-reset-project-tokens: + - 0s + x-ratelimit-reset-requests: + - 6ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_6dbb2a6c9a864480b8d1584ae0f51fad + status: + code: 200 + message: OK +version: 1 diff --git a/tests/tracing/test_tracing.py b/tests/tracing/test_tracing.py index 03fc3ddb5..806eab3fb 100644 --- a/tests/tracing/test_tracing.py +++ b/tests/tracing/test_tracing.py @@ -33,10 +33,6 @@ class TestTraceListenerSetup: "crewai.utilities.events.listeners.tracing.trace_batch_manager.get_auth_token", return_value="mock_token_12345", ), - patch( - "crewai.utilities.events.listeners.tracing.interfaces.get_auth_token", - return_value="mock_token_12345", - ), ): yield @@ -296,7 +292,21 @@ class TestTraceListenerSetup: assert trace_listener.trace_enabled is True assert trace_listener.batch_manager is not None - assert trace_listener.trace_sender is not None + + @pytest.mark.vcr(filter_headers=["authorization"]) + def test_trace_listener_setup_correctly_with_tracing_flag(self): + """Test that trace listener is set up correctly when enabled""" + agent = Agent(role="Test Agent", goal="Test goal", backstory="Test backstory") + task = Task( + description="Say hello to the world", + expected_output="hello world", + agent=agent, + ) + crew = Crew(agents=[agent], tasks=[task], verbose=True, tracing=True) + crew.kickoff() + trace_listener = TraceCollectionListener(tracing=True) + assert trace_listener.trace_enabled is True + assert trace_listener.batch_manager is not None # Helper method to ensure cleanup def teardown_method(self):