Lorenze/tracing-improvements-cleanup (#3291)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

* feat: add tracing support to Crew and Flow classes

- Introduced a new `tracing` optional field in both the `Crew` and `Flow` classes to enable tracing functionality.
- Updated the initialization logic to conditionally set up the `TraceCollectionListener` based on the `tracing` flag or the `CREWAI_TRACING_ENABLED` environment variable.
- Removed the obsolete `interfaces.py` file related to tracing.
- Enhanced the `TraceCollectionListener` to accept a `tracing` parameter and adjusted its internal logic accordingly.
- Added tests to verify the correct setup of the trace listener when tracing is enabled.

This change improves the observability of the crew execution process and allows for better debugging and performance monitoring.

* fix flow name

* refactor: replace _send_batch method with finalize_batch calls in TraceCollectionListener

- Updated the TraceCollectionListener to use the batch_manager's finalize_batch method instead of the deprecated _send_batch method for handling trace events.
- This change improves the clarity of the code and ensures that batch finalization is consistently managed through the batch manager.
- Removed the obsolete _send_batch method to streamline the listener's functionality.

* removed comments

* refactor: enhance tracing functionality by introducing utility for tracing checks

- Added a new utility function `is_tracing_enabled` to streamline the logic for checking if tracing is enabled based on the `CREWAI_TRACING_ENABLED` environment variable.
- Updated the `Crew` and `Flow` classes to utilize this utility for improved readability and maintainability.
- Refactored the `TraceCollectionListener` to simplify tracing checks and ensure consistent behavior across components.
- Introduced a new module for tracing utilities to encapsulate related functions, enhancing code organization.

* refactor: remove unused imports from crew and flow modules

- Removed unnecessary `os` imports from both `crew.py` and `flow.py` files to enhance code cleanliness and maintainability.
This commit is contained in:
Lorenze Jay
2025-08-08 13:42:25 -07:00
committed by GitHub
parent a221295394
commit 251ae00b8b
9 changed files with 176 additions and 75 deletions

View File

@@ -1,4 +1,3 @@
import os
import asyncio
import json
import re
@@ -78,6 +77,7 @@ from crewai.utilities.events.listeners.tracing.trace_listener import (
)
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -248,6 +248,10 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Metrics for the LLM usage during all tasks execution.",
)
tracing: Optional[bool] = Field(
default=False,
description="Whether to enable tracing for the crew.",
)
@field_validator("id", mode="before")
@classmethod
@@ -279,8 +283,8 @@ class Crew(FlowTrackable, BaseModel):
self._cache_handler = CacheHandler()
event_listener = EventListener()
if os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true":
trace_listener = TraceCollectionListener()
if is_tracing_enabled() or self.tracing:
trace_listener = TraceCollectionListener(tracing=self.tracing)
trace_listener.setup_listeners(crewai_event_bus)
event_listener.verbose = self.verbose
event_listener.formatter.verbose = self.verbose

View File

@@ -2,7 +2,6 @@ import asyncio
import copy
import inspect
import logging
import os
from typing import (
Any,
Callable,
@@ -36,6 +35,7 @@ from crewai.utilities.events.flow_events import (
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__)
@@ -441,6 +441,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
name: Optional[str] = None
tracing: Optional[bool] = False
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls): # type: ignore
@@ -452,6 +453,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
def __init__(
self,
persistence: Optional[FlowPersistence] = None,
tracing: Optional[bool] = False,
**kwargs: Any,
) -> None:
"""Initialize a new Flow instance.
@@ -469,8 +471,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Initialize state with initial values
self._state = self._create_initial_state()
if os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true":
trace_listener = TraceCollectionListener()
self.tracing = tracing
if is_tracing_enabled() or tracing:
trace_listener = TraceCollectionListener(tracing=tracing)
trace_listener.setup_listeners(crewai_event_bus)
# Apply any additional kwargs
if kwargs:

View File

@@ -1,33 +0,0 @@
import json
from datetime import datetime
from crewai.cli.plus_api import PlusAPI
from crewai.cli.authentication.token import get_auth_token
from pydantic import BaseModel
from .trace_batch_manager import TraceBatch
from logging import getLogger
logger = getLogger(__name__)
class TraceSender(BaseModel):
"""Trace sender for sending trace batches to the backend"""
def send_batch(self, batch: TraceBatch) -> bool:
"""Print trace batch to console"""
try:
payload = batch.to_dict()
def datetime_handler(obj):
if isinstance(obj, datetime):
return obj.isoformat()
serialized_payload = json.loads(
json.dumps(payload, default=datetime_handler)
)
PlusAPI(api_key=get_auth_token()).send_trace_batch(serialized_payload)
return True
except Exception as e:
logger.error(f"Error sending trace batch: {e}")
return False

View File

@@ -66,6 +66,7 @@ class TraceBatchManager:
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
):
"""Send batch initialization to backend"""
if not self.plus_api or not self.current_batch:
return
@@ -75,8 +76,8 @@ class TraceBatchManager:
"execution_type": execution_metadata.get("execution_type", "crew"),
"execution_context": {
"crew_fingerprint": execution_metadata.get("crew_fingerprint"),
"crew_name": execution_metadata.get("crew_name", "Unknown Crew"),
"flow_name": execution_metadata.get("flow_name", "Unknown Flow"),
"crew_name": execution_metadata.get("crew_name", None),
"flow_name": execution_metadata.get("flow_name", None),
"crewai_version": self.current_batch.version,
"privacy_level": user_context.get("privacy_level", "standard"),
},

View File

@@ -13,6 +13,7 @@ from crewai.utilities.events.agent_events import (
AgentExecutionErrorEvent,
)
from crewai.utilities.events.listeners.tracing.types import TraceEvent
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.events.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
@@ -65,7 +66,7 @@ from crewai.utilities.events.memory_events import (
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
from .interfaces import TraceSender
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.version import get_crewai_version
@@ -75,13 +76,13 @@ class TraceCollectionListener(BaseEventListener):
Trace collection listener that orchestrates trace collection
"""
trace_enabled: bool = False
trace_enabled: Optional[bool] = False
complex_events = ["task_started", "llm_call_started", "llm_call_completed"]
_instance = None
_initialized = False
def __new__(cls, batch_manager=None, trace_sender=None):
def __new__(cls, batch_manager=None, tracing: Optional[bool] = False):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@@ -89,14 +90,14 @@ class TraceCollectionListener(BaseEventListener):
def __init__(
self,
batch_manager: Optional[TraceBatchManager] = None,
trace_sender: Optional[TraceSender] = None,
tracing: Optional[bool] = False,
):
if self._initialized:
return
super().__init__()
self.batch_manager = batch_manager or TraceBatchManager()
self.trace_sender = trace_sender or TraceSender()
self.tracing = tracing or False
self.trace_enabled = self._check_trace_enabled()
self._initialized = True
@@ -106,9 +107,7 @@ class TraceCollectionListener(BaseEventListener):
if not auth_token:
return False
return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true" or bool(
os.getenv("CREWAI_USER_TOKEN")
)
return is_tracing_enabled() or self.tracing
def _get_user_context(self) -> Dict[str, str]:
"""Extract user context for tracing"""
@@ -156,7 +155,7 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event):
self._handle_trace_event("flow_finished", source, event)
self._send_batch()
self.batch_manager.finalize_batch()
@event_bus.on(FlowPlotEvent)
def on_flow_plot(source, event):
@@ -174,12 +173,12 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
self._handle_trace_event("crew_kickoff_completed", source, event)
self._send_batch()
self.batch_manager.finalize_batch()
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event):
self._handle_trace_event("crew_kickoff_failed", source, event)
self._send_batch()
self.batch_manager.finalize_batch()
@event_bus.on(TaskStartedEvent)
def on_task_started(source, event):
@@ -303,7 +302,7 @@ class TraceCollectionListener(BaseEventListener):
"""Initialize trace batch for Flow execution"""
user_context = self._get_user_context()
execution_metadata = {
"flow_name": getattr(source, "__class__.__name__", "Unknown Flow"),
"flow_name": getattr(event, "flow_name", "Unknown Flow"),
"execution_start": event.timestamp if hasattr(event, "timestamp") else None,
"crewai_version": get_crewai_version(),
"execution_type": "flow",
@@ -332,14 +331,6 @@ class TraceCollectionListener(BaseEventListener):
trace_event = self._create_trace_event(event_type, source, event)
self.batch_manager.add_event(trace_event)
def _send_batch(self):
"""Send finalized batch using the configured sender"""
batch = self.batch_manager.finalize_batch()
if batch:
success = self.trace_sender.send_batch(batch)
if not success:
print("⚠️ Failed to send trace batch")
def _create_trace_event(
self, event_type: str, source: Any, event: Any
) -> TraceEvent:
@@ -360,20 +351,15 @@ class TraceCollectionListener(BaseEventListener):
elif event_type == "task_started":
return {
"task_description": event.task.description,
"expected_output": event.task.expected_output,
"task_name": event.task.name,
"context": event.context,
"agent": source.agent.role,
}
elif event_type == "llm_call_started":
return {
**self._safe_serialize_to_dict(event),
"messages": self._truncate_messages(event.messages),
}
return self._safe_serialize_to_dict(event)
elif event_type == "llm_call_completed":
return {
**self._safe_serialize_to_dict(event),
"messages": self._truncate_messages(event.messages),
}
return self._safe_serialize_to_dict(event)
else:
return {
"event_type": event_type,
@@ -396,7 +382,7 @@ class TraceCollectionListener(BaseEventListener):
return {"serialization_error": str(e), "object_type": type(obj).__name__}
# TODO: move to utils
def _truncate_messages(self, messages, max_content_length=200, max_messages=5):
def _truncate_messages(self, messages, max_content_length=500, max_messages=5):
"""Truncate message content and limit number of messages"""
if not messages or not isinstance(messages, list):
return messages

View File

@@ -0,0 +1,5 @@
import os
def is_tracing_enabled() -> bool:
return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true"

View File

@@ -0,0 +1,125 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour
personal goal is: Test goal\nTo give my best complete final answer to the task
respond using the exact following format:\n\nThought: I now can give a great
answer\nFinal Answer: Your final answer must be the great and the most complete
as possible, it must be outcome described.\n\nI MUST use these formats, my job
depends on it!"}, {"role": "user", "content": "\nCurrent Task: Say hello to
the world\n\nThis is the expected criteria for your final answer: hello world\nyou
MUST return the actual complete content as the final answer, not a summary.\n\nBegin!
This is VERY important to you, use the tools available and give your best Final
Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '820'
content-type:
- application/json
cookie:
- _cfuvid=NaXWifUGChHp6Ap1mvfMrNzmO4HdzddrqXkSR9T.hYo-1754508545647-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFJdj9MwEHzPr1j5uUFp6DUhbycE4g6eKEic4BS59ibx4Xgt27mCTv3v
yEmvSfmQeImUnZ3xzO4+JQBMSVYBEx0Porc6fZ3nH2725cc3O1Pevnv/+e5OPFx/2mW7wd5+YavI
oP0DivDMeiGotxqDIjPBwiEPGFXXxdVmm623r7Yj0JNEHWmtDemG0jzLN2lWptn2ROxICfSsgq8J
AMDT+I0WjcQfrIJs9Vzp0XveIqvOTQDMkY4Vxr1XPnAT2GoGBZmAZnR9A4YOILiBVj0icGijY+DG
H9ABfDNvleEarsf/CjrUmuBATsuloMNm8DzmMYPWC4AbQ4HHeYxR7k/I8WxeU2sd7f1vVNYoo3xX
O+SeTDTqA1k2oscE4H4c0nCRm1lHvQ11oO84Pre+KiY9Nq9lgb48gYEC14t6cRrtpV4tMXCl/WLM
THDRoZyp8074IBUtgGSR+k83f9OekivT/o/8DAiBNqCsrUOpxGXiuc1hvNp/tZ2nPBpmHt2jElgH
hS5uQmLDBz0dFPM/fcC+bpRp0VmnpqtqbJ0VZbHGnMuSJcfkFwAAAP//AwBXeOIeXgMAAA==
headers:
CF-RAY:
- 96b9d31b89dc1684-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Thu, 07 Aug 2025 21:21:37 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=nQuY9yOahy.xg.aHkxwJgC5gyX5c9Xjbhp3Y7GMX4Ek-1754601697-1.0.1.1-_K22zHDSq5PrNEgK7qwpgcjPitPpgoT54GksNiq6j.aSPasbC7UakO3AYT59smUo5j14NY_OrHkDhm.eGIdpUTpnoJZK7MfR7X8Z96FITGs;
path=/; expires=Thu, 07-Aug-25 21:51:37 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '424'
openai-project:
- proj_xitITlrFeen7zjNSzML82h9x
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '438'
x-ratelimit-limit-project-tokens:
- '30000000'
x-ratelimit-limit-requests:
- '10000'
x-ratelimit-limit-tokens:
- '30000000'
x-ratelimit-remaining-project-tokens:
- '29999828'
x-ratelimit-remaining-requests:
- '9999'
x-ratelimit-remaining-tokens:
- '29999828'
x-ratelimit-reset-project-tokens:
- 0s
x-ratelimit-reset-requests:
- 6ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_6dbb2a6c9a864480b8d1584ae0f51fad
status:
code: 200
message: OK
version: 1

View File

@@ -33,10 +33,6 @@ class TestTraceListenerSetup:
"crewai.utilities.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token_12345",
),
patch(
"crewai.utilities.events.listeners.tracing.interfaces.get_auth_token",
return_value="mock_token_12345",
),
):
yield
@@ -296,7 +292,21 @@ class TestTraceListenerSetup:
assert trace_listener.trace_enabled is True
assert trace_listener.batch_manager is not None
assert trace_listener.trace_sender is not None
@pytest.mark.vcr(filter_headers=["authorization"])
def test_trace_listener_setup_correctly_with_tracing_flag(self):
"""Test that trace listener is set up correctly when enabled"""
agent = Agent(role="Test Agent", goal="Test goal", backstory="Test backstory")
task = Task(
description="Say hello to the world",
expected_output="hello world",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=True, tracing=True)
crew.kickoff()
trace_listener = TraceCollectionListener(tracing=True)
assert trace_listener.trace_enabled is True
assert trace_listener.batch_manager is not None
# Helper method to ensure cleanup
def teardown_method(self):