diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 3cac711fa..5e654463c 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -1,4 +1,3 @@ -import os import asyncio import json import re @@ -78,6 +77,7 @@ from crewai.utilities.events.listeners.tracing.trace_listener import ( ) +from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled from crewai.utilities.formatter import ( aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_tasks, @@ -248,6 +248,10 @@ class Crew(FlowTrackable, BaseModel): default=None, description="Metrics for the LLM usage during all tasks execution.", ) + tracing: Optional[bool] = Field( + default=False, + description="Whether to enable tracing for the crew.", + ) @field_validator("id", mode="before") @classmethod @@ -279,8 +283,8 @@ class Crew(FlowTrackable, BaseModel): self._cache_handler = CacheHandler() event_listener = EventListener() - if os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true": - trace_listener = TraceCollectionListener() + if is_tracing_enabled() or self.tracing: + trace_listener = TraceCollectionListener(tracing=self.tracing) trace_listener.setup_listeners(crewai_event_bus) event_listener.verbose = self.verbose event_listener.formatter.verbose = self.verbose diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 9d75dfe5a..0c145b401 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -2,7 +2,6 @@ import asyncio import copy import inspect import logging -import os from typing import ( Any, Callable, @@ -36,6 +35,7 @@ from crewai.utilities.events.flow_events import ( from crewai.utilities.events.listeners.tracing.trace_listener import ( TraceCollectionListener, ) +from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled from crewai.utilities.printer import Printer logger = logging.getLogger(__name__) @@ -441,6 +441,7 @@ class Flow(Generic[T], metaclass=FlowMeta): _router_paths: Dict[str, List[str]] = {} initial_state: Union[Type[T], T, None] = None name: Optional[str] = None + tracing: Optional[bool] = False def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]: class _FlowGeneric(cls): # type: ignore @@ -452,6 +453,7 @@ class Flow(Generic[T], metaclass=FlowMeta): def __init__( self, persistence: Optional[FlowPersistence] = None, + tracing: Optional[bool] = False, **kwargs: Any, ) -> None: """Initialize a new Flow instance. @@ -469,8 +471,9 @@ class Flow(Generic[T], metaclass=FlowMeta): # Initialize state with initial values self._state = self._create_initial_state() - if os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true": - trace_listener = TraceCollectionListener() + self.tracing = tracing + if is_tracing_enabled() or tracing: + trace_listener = TraceCollectionListener(tracing=tracing) trace_listener.setup_listeners(crewai_event_bus) # Apply any additional kwargs if kwargs: diff --git a/src/crewai/utilities/events/listeners/tracing/__init__.py b/src/crewai/utilities/events/listeners/tracing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/crewai/utilities/events/listeners/tracing/interfaces.py b/src/crewai/utilities/events/listeners/tracing/interfaces.py deleted file mode 100644 index 4516be7f8..000000000 --- a/src/crewai/utilities/events/listeners/tracing/interfaces.py +++ /dev/null @@ -1,33 +0,0 @@ -import json -from datetime import datetime - -from crewai.cli.plus_api import PlusAPI -from crewai.cli.authentication.token import get_auth_token -from pydantic import BaseModel -from .trace_batch_manager import TraceBatch -from logging import getLogger - -logger = getLogger(__name__) - - -class TraceSender(BaseModel): - """Trace sender for sending trace batches to the backend""" - - def send_batch(self, batch: TraceBatch) -> bool: - """Print trace batch to console""" - try: - payload = batch.to_dict() - - def datetime_handler(obj): - if isinstance(obj, datetime): - return obj.isoformat() - - serialized_payload = json.loads( - json.dumps(payload, default=datetime_handler) - ) - - PlusAPI(api_key=get_auth_token()).send_trace_batch(serialized_payload) - return True - except Exception as e: - logger.error(f"Error sending trace batch: {e}") - return False diff --git a/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py b/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py index d014c65d2..4a1f06621 100644 --- a/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py +++ b/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py @@ -66,6 +66,7 @@ class TraceBatchManager: self, user_context: Dict[str, str], execution_metadata: Dict[str, Any] ): """Send batch initialization to backend""" + if not self.plus_api or not self.current_batch: return @@ -75,8 +76,8 @@ class TraceBatchManager: "execution_type": execution_metadata.get("execution_type", "crew"), "execution_context": { "crew_fingerprint": execution_metadata.get("crew_fingerprint"), - "crew_name": execution_metadata.get("crew_name", "Unknown Crew"), - "flow_name": execution_metadata.get("flow_name", "Unknown Flow"), + "crew_name": execution_metadata.get("crew_name", None), + "flow_name": execution_metadata.get("flow_name", None), "crewai_version": self.current_batch.version, "privacy_level": user_context.get("privacy_level", "standard"), }, diff --git a/src/crewai/utilities/events/listeners/tracing/trace_listener.py b/src/crewai/utilities/events/listeners/tracing/trace_listener.py index de262ff5b..e8d68ccae 100644 --- a/src/crewai/utilities/events/listeners/tracing/trace_listener.py +++ b/src/crewai/utilities/events/listeners/tracing/trace_listener.py @@ -13,6 +13,7 @@ from crewai.utilities.events.agent_events import ( AgentExecutionErrorEvent, ) from crewai.utilities.events.listeners.tracing.types import TraceEvent +from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled from crewai.utilities.events.reasoning_events import ( AgentReasoningStartedEvent, AgentReasoningCompletedEvent, @@ -65,7 +66,7 @@ from crewai.utilities.events.memory_events import ( MemorySaveCompletedEvent, MemorySaveFailedEvent, ) -from .interfaces import TraceSender + from crewai.cli.authentication.token import get_auth_token from crewai.cli.version import get_crewai_version @@ -75,13 +76,13 @@ class TraceCollectionListener(BaseEventListener): Trace collection listener that orchestrates trace collection """ - trace_enabled: bool = False + trace_enabled: Optional[bool] = False complex_events = ["task_started", "llm_call_started", "llm_call_completed"] _instance = None _initialized = False - def __new__(cls, batch_manager=None, trace_sender=None): + def __new__(cls, batch_manager=None, tracing: Optional[bool] = False): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance @@ -89,14 +90,14 @@ class TraceCollectionListener(BaseEventListener): def __init__( self, batch_manager: Optional[TraceBatchManager] = None, - trace_sender: Optional[TraceSender] = None, + tracing: Optional[bool] = False, ): if self._initialized: return super().__init__() self.batch_manager = batch_manager or TraceBatchManager() - self.trace_sender = trace_sender or TraceSender() + self.tracing = tracing or False self.trace_enabled = self._check_trace_enabled() self._initialized = True @@ -106,9 +107,7 @@ class TraceCollectionListener(BaseEventListener): if not auth_token: return False - return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true" or bool( - os.getenv("CREWAI_USER_TOKEN") - ) + return is_tracing_enabled() or self.tracing def _get_user_context(self) -> Dict[str, str]: """Extract user context for tracing""" @@ -156,7 +155,7 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(FlowFinishedEvent) def on_flow_finished(source, event): self._handle_trace_event("flow_finished", source, event) - self._send_batch() + self.batch_manager.finalize_batch() @event_bus.on(FlowPlotEvent) def on_flow_plot(source, event): @@ -174,12 +173,12 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(CrewKickoffCompletedEvent) def on_crew_completed(source, event): self._handle_trace_event("crew_kickoff_completed", source, event) - self._send_batch() + self.batch_manager.finalize_batch() @event_bus.on(CrewKickoffFailedEvent) def on_crew_failed(source, event): self._handle_trace_event("crew_kickoff_failed", source, event) - self._send_batch() + self.batch_manager.finalize_batch() @event_bus.on(TaskStartedEvent) def on_task_started(source, event): @@ -303,7 +302,7 @@ class TraceCollectionListener(BaseEventListener): """Initialize trace batch for Flow execution""" user_context = self._get_user_context() execution_metadata = { - "flow_name": getattr(source, "__class__.__name__", "Unknown Flow"), + "flow_name": getattr(event, "flow_name", "Unknown Flow"), "execution_start": event.timestamp if hasattr(event, "timestamp") else None, "crewai_version": get_crewai_version(), "execution_type": "flow", @@ -332,14 +331,6 @@ class TraceCollectionListener(BaseEventListener): trace_event = self._create_trace_event(event_type, source, event) self.batch_manager.add_event(trace_event) - def _send_batch(self): - """Send finalized batch using the configured sender""" - batch = self.batch_manager.finalize_batch() - if batch: - success = self.trace_sender.send_batch(batch) - if not success: - print("⚠️ Failed to send trace batch") - def _create_trace_event( self, event_type: str, source: Any, event: Any ) -> TraceEvent: @@ -360,20 +351,15 @@ class TraceCollectionListener(BaseEventListener): elif event_type == "task_started": return { "task_description": event.task.description, + "expected_output": event.task.expected_output, "task_name": event.task.name, "context": event.context, "agent": source.agent.role, } elif event_type == "llm_call_started": - return { - **self._safe_serialize_to_dict(event), - "messages": self._truncate_messages(event.messages), - } + return self._safe_serialize_to_dict(event) elif event_type == "llm_call_completed": - return { - **self._safe_serialize_to_dict(event), - "messages": self._truncate_messages(event.messages), - } + return self._safe_serialize_to_dict(event) else: return { "event_type": event_type, @@ -396,7 +382,7 @@ class TraceCollectionListener(BaseEventListener): return {"serialization_error": str(e), "object_type": type(obj).__name__} # TODO: move to utils - def _truncate_messages(self, messages, max_content_length=200, max_messages=5): + def _truncate_messages(self, messages, max_content_length=500, max_messages=5): """Truncate message content and limit number of messages""" if not messages or not isinstance(messages, list): return messages diff --git a/src/crewai/utilities/events/listeners/tracing/utils.py b/src/crewai/utilities/events/listeners/tracing/utils.py new file mode 100644 index 000000000..e498bc4f9 --- /dev/null +++ b/src/crewai/utilities/events/listeners/tracing/utils.py @@ -0,0 +1,5 @@ +import os + + +def is_tracing_enabled() -> bool: + return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true" diff --git a/tests/cassettes/TestTraceListenerSetup.test_trace_listener_setup_correctly_with_tracing_flag.yaml b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_setup_correctly_with_tracing_flag.yaml new file mode 100644 index 000000000..84f8dfa2a --- /dev/null +++ b/tests/cassettes/TestTraceListenerSetup.test_trace_listener_setup_correctly_with_tracing_flag.yaml @@ -0,0 +1,125 @@ +interactions: +- request: + body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour + personal goal is: Test goal\nTo give my best complete final answer to the task + respond using the exact following format:\n\nThought: I now can give a great + answer\nFinal Answer: Your final answer must be the great and the most complete + as possible, it must be outcome described.\n\nI MUST use these formats, my job + depends on it!"}, {"role": "user", "content": "\nCurrent Task: Say hello to + the world\n\nThis is the expected criteria for your final answer: hello world\nyou + MUST return the actual complete content as the final answer, not a summary.\n\nBegin! + This is VERY important to you, use the tools available and give your best Final + Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o", "stop": ["\nObservation:"]}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + connection: + - keep-alive + content-length: + - '820' + content-type: + - application/json + cookie: + - _cfuvid=NaXWifUGChHp6Ap1mvfMrNzmO4HdzddrqXkSR9T.hYo-1754508545647-0.0.1.1-604800000 + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.93.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.93.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600.0' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.9 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFJdj9MwEHzPr1j5uUFp6DUhbycE4g6eKEic4BS59ibx4Xgt27mCTv3v + yEmvSfmQeImUnZ3xzO4+JQBMSVYBEx0Porc6fZ3nH2725cc3O1Pevnv/+e5OPFx/2mW7wd5+YavI + oP0DivDMeiGotxqDIjPBwiEPGFXXxdVmm623r7Yj0JNEHWmtDemG0jzLN2lWptn2ROxICfSsgq8J + AMDT+I0WjcQfrIJs9Vzp0XveIqvOTQDMkY4Vxr1XPnAT2GoGBZmAZnR9A4YOILiBVj0icGijY+DG + H9ABfDNvleEarsf/CjrUmuBATsuloMNm8DzmMYPWC4AbQ4HHeYxR7k/I8WxeU2sd7f1vVNYoo3xX + O+SeTDTqA1k2oscE4H4c0nCRm1lHvQ11oO84Pre+KiY9Nq9lgb48gYEC14t6cRrtpV4tMXCl/WLM + THDRoZyp8074IBUtgGSR+k83f9OekivT/o/8DAiBNqCsrUOpxGXiuc1hvNp/tZ2nPBpmHt2jElgH + hS5uQmLDBz0dFPM/fcC+bpRp0VmnpqtqbJ0VZbHGnMuSJcfkFwAAAP//AwBXeOIeXgMAAA== + headers: + CF-RAY: + - 96b9d31b89dc1684-SJC + Connection: + - keep-alive + Content-Encoding: + - gzip + Content-Type: + - application/json + Date: + - Thu, 07 Aug 2025 21:21:37 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=nQuY9yOahy.xg.aHkxwJgC5gyX5c9Xjbhp3Y7GMX4Ek-1754601697-1.0.1.1-_K22zHDSq5PrNEgK7qwpgcjPitPpgoT54GksNiq6j.aSPasbC7UakO3AYT59smUo5j14NY_OrHkDhm.eGIdpUTpnoJZK7MfR7X8Z96FITGs; + path=/; expires=Thu, 07-Aug-25 21:51:37 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - crewai-iuxna1 + openai-processing-ms: + - '424' + openai-project: + - proj_xitITlrFeen7zjNSzML82h9x + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '438' + x-ratelimit-limit-project-tokens: + - '30000000' + x-ratelimit-limit-requests: + - '10000' + x-ratelimit-limit-tokens: + - '30000000' + x-ratelimit-remaining-project-tokens: + - '29999828' + x-ratelimit-remaining-requests: + - '9999' + x-ratelimit-remaining-tokens: + - '29999828' + x-ratelimit-reset-project-tokens: + - 0s + x-ratelimit-reset-requests: + - 6ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_6dbb2a6c9a864480b8d1584ae0f51fad + status: + code: 200 + message: OK +version: 1 diff --git a/tests/tracing/test_tracing.py b/tests/tracing/test_tracing.py index 03fc3ddb5..806eab3fb 100644 --- a/tests/tracing/test_tracing.py +++ b/tests/tracing/test_tracing.py @@ -33,10 +33,6 @@ class TestTraceListenerSetup: "crewai.utilities.events.listeners.tracing.trace_batch_manager.get_auth_token", return_value="mock_token_12345", ), - patch( - "crewai.utilities.events.listeners.tracing.interfaces.get_auth_token", - return_value="mock_token_12345", - ), ): yield @@ -296,7 +292,21 @@ class TestTraceListenerSetup: assert trace_listener.trace_enabled is True assert trace_listener.batch_manager is not None - assert trace_listener.trace_sender is not None + + @pytest.mark.vcr(filter_headers=["authorization"]) + def test_trace_listener_setup_correctly_with_tracing_flag(self): + """Test that trace listener is set up correctly when enabled""" + agent = Agent(role="Test Agent", goal="Test goal", backstory="Test backstory") + task = Task( + description="Say hello to the world", + expected_output="hello world", + agent=agent, + ) + crew = Crew(agents=[agent], tasks=[task], verbose=True, tracing=True) + crew.kickoff() + trace_listener = TraceCollectionListener(tracing=True) + assert trace_listener.trace_enabled is True + assert trace_listener.batch_manager is not None # Helper method to ensure cleanup def teardown_method(self):