Lorenze/tracing v1 (#3279)

* initial setup

* feat: enhance CrewKickoffCompletedEvent to include total token usage

- Added total_tokens attribute to CrewKickoffCompletedEvent for better tracking of token usage during crew execution.
- Updated Crew class to emit total token usage upon kickoff completion.
- Removed obsolete context handler and execution context tracker files to streamline event handling.

* cleanup

* remove print statements for loggers

* feat: add CrewAI base URL and improve logging in tracing

- Introduced `CREWAI_BASE_URL` constant for easy access to the CrewAI application URL.
- Replaced print statements with logging in the `TraceSender` class for better error tracking.
- Enhanced the `TraceBatchManager` to provide default values for flow names and removed unnecessary comments.
- Implemented singleton pattern in `TraceCollectionListener` to ensure a single instance is used.
- Added a new test case to verify that the trace listener correctly collects events during crew execution.

* clear

* fix: update datetime serialization in tracing interfaces

- Removed the 'Z' suffix from datetime serialization in TraceSender and TraceEvent to ensure consistent ISO format.
- Added new test cases to validate the functionality of the TraceBatchManager and event collection during crew execution.
- Introduced fixtures to clear event bus listeners before each test to maintain isolation.

* test: enhance tracing tests with mock authentication token

- Added a mock authentication token to the tracing tests to ensure proper setup and event collection.
- Updated test methods to include the mock token, improving isolation and reliability of tests related to the TraceListener and BatchManager.
- Ensured that the tests validate the correct behavior of event collection during crew execution.

* test: refactor tracing tests to improve mock usage

- Moved the mock authentication token patching inside the test class to enhance readability and maintainability.
- Updated test methods to remove unnecessary mock parameters, streamlining the test signatures.
- Ensured that the tests continue to validate the correct behavior of event collection during crew execution while improving isolation.

* test: refactor tracing tests for improved mock usage and consistency

- Moved mock authentication token patching into individual test methods for better clarity and maintainability.
- Corrected the backstory string in the `Agent` instantiation to fix a typo.
- Ensured that all tests validate the correct behavior of event collection during crew execution while enhancing isolation and readability.

* test: add new tracing test for disabled trace listener

- Introduced a new test case to verify that the trace listener does not make HTTP calls when tracing is disabled via environment variables.
- Enhanced existing tests by mocking PlusAPI HTTP calls to avoid authentication and network requests, improving test isolation and reliability.
- Updated the test setup to ensure proper initialization of the trace listener and its components during crew execution.

* refactor: update LLM class to utilize new completion function and improve cost calculation

- Replaced direct calls to `litellm.completion` with a new import for better clarity and maintainability.
- Introduced a new optional attribute `completion_cost` in the LLM class to track the cost of completions.
- Updated the handling of completion responses to ensure accurate cost calculations and improved error handling.
- Removed outdated test cassettes for gemini models to streamline test suite and avoid redundancy.
- Enhanced existing tests to reflect changes in the LLM class and ensure proper functionality.

* test: enhance tracing tests with additional request and response scenarios

- Added new test cases to validate the behavior of the trace listener and batch manager when handling 404 responses from the tracing API.
- Updated existing test cassettes to include detailed request and response structures, ensuring comprehensive coverage of edge cases.
- Improved mock setup to avoid unnecessary network calls and enhance test reliability.
- Ensured that the tests validate the correct behavior of event collection during crew execution, particularly in scenarios where the tracing service is unavailable.

* feat: enable conditional tracing based on environment variable

- Added support for enabling or disabling the trace listener based on the `CREWAI_TRACING_ENABLED` environment variable.
- Updated the `Crew` class to conditionally set up the trace listener only when tracing is enabled, improving performance and resource management.
- Refactored test cases to ensure proper cleanup of event bus listeners before and after each test, enhancing test reliability and isolation.
- Improved mock setup in tracing tests to validate the behavior of the trace listener when tracing is disabled.

* fix: downgrade litellm version from 1.74.9 to 1.74.3

- Updated the `pyproject.toml` and `uv.lock` files to reflect the change in the `litellm` dependency version.
- This downgrade addresses compatibility issues and ensures stability in the project environment.

* refactor: improve tracing test setup by moving mock authentication token patching

- Removed the module-level patch for the authentication token and implemented a fixture to mock the token for all tests in the class, enhancing test isolation and readability.
- Updated the event bus clearing logic to ensure original handlers are restored after tests, improving reliability of the test environment.
- This refactor streamlines the test setup and ensures consistent behavior across tracing tests.

* test: enhance tracing test setup with comprehensive mock authentication

- Expanded the mock authentication token patching to cover all instances where `get_auth_token` is used across different modules, ensuring consistent behavior in tests.
- Introduced a new fixture to reset tracing singleton instances between tests, improving test isolation and reliability.
- This update enhances the overall robustness of the tracing tests by ensuring that all necessary components are properly mocked and reset, leading to more reliable test outcomes.

* just drop the test for now

* refactor: comment out completion-related code in LLM and LLM event classes

- Commented out the `completion` and `completion_cost` imports and their usage in the `LLM` class to prevent potential issues during execution.
- Updated the `LLMCallCompletedEvent` class to comment out the `response_cost` attribute, ensuring consistency with the changes in the LLM class.
- This refactor aims to streamline the code and prepare for future updates without affecting current functionality.

* refactor: update LLM response handling in LiteAgent

- Commented out the `response_cost` attribute in the LLM response handling to align with recent refactoring in the LLM class.
- This change aims to maintain consistency in the codebase and prepare for future updates without affecting current functionality.

* refactor: remove commented-out response cost attributes in LLM and LiteAgent

- Commented out the `response_cost` attribute in both the `LiteAgent` and `LLM` classes to maintain consistency with recent refactoring efforts.
- This change aligns with previous updates aimed at streamlining the codebase and preparing for future enhancements without impacting current functionality.

* bring back litellm upgrade version
This commit is contained in:
Lorenze Jay
2025-08-06 14:05:14 -07:00
committed by GitHub
parent 7dc86dc79a
commit 8f4a6cc61c
22 changed files with 3035 additions and 231 deletions

View File

@@ -17,6 +17,7 @@ class PlusAPI:
ORGANIZATIONS_RESOURCE = "/crewai_plus/api/v1/me/organizations"
CREWS_RESOURCE = "/crewai_plus/api/v1/crews"
AGENTS_RESOURCE = "/crewai_plus/api/v1/agents"
TRACING_RESOURCE = "/crewai_plus/api/v1/tracing"
def __init__(self, api_key: str) -> None:
self.api_key = api_key
@@ -114,3 +115,25 @@ class PlusAPI:
def get_organizations(self) -> requests.Response:
return self._make_request("GET", self.ORGANIZATIONS_RESOURCE)
def send_trace_batch(self, payload) -> requests.Response:
return self._make_request("POST", self.TRACING_RESOURCE, json=payload)
def initialize_trace_batch(self, payload) -> requests.Response:
return self._make_request(
"POST", f"{self.TRACING_RESOURCE}/batches", json=payload
)
def send_trace_events(self, trace_batch_id: str, payload) -> requests.Response:
return self._make_request(
"POST",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/events",
json=payload,
)
def finalize_trace_batch(self, trace_batch_id: str, payload) -> requests.Response:
return self._make_request(
"PATCH",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
)

View File

@@ -1,3 +1,4 @@
import os
import asyncio
import json
import re
@@ -72,6 +73,11 @@ from crewai.utilities.events.crew_events import (
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.event_listener import EventListener
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -238,6 +244,10 @@ class Crew(FlowTrackable, BaseModel):
default_factory=SecurityConfig,
description="Security configuration for the crew, including fingerprinting.",
)
token_usage: Optional[UsageMetrics] = Field(
default=None,
description="Metrics for the LLM usage during all tasks execution.",
)
@field_validator("id", mode="before")
@classmethod
@@ -269,6 +279,9 @@ class Crew(FlowTrackable, BaseModel):
self._cache_handler = CacheHandler()
event_listener = EventListener()
if os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true":
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
event_listener.verbose = self.verbose
event_listener.formatter.verbose = self.verbose
self._logger = Logger(verbose=self.verbose)
@@ -1044,11 +1057,13 @@ class Crew(FlowTrackable, BaseModel):
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
self.token_usage = self.calculate_usage_metrics()
crewai_event_bus.emit(
self,
CrewKickoffCompletedEvent(
crew_name=self.name, output=final_task_output
crew_name=self.name,
output=final_task_output,
total_tokens=self.token_usage.total_tokens,
),
)
return CrewOutput(
@@ -1056,7 +1071,7 @@ class Crew(FlowTrackable, BaseModel):
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=task_outputs,
token_usage=token_usage,
token_usage=self.token_usage,
)
def _process_async_tasks(
@@ -1226,7 +1241,6 @@ class Crew(FlowTrackable, BaseModel):
if self.external_memory:
copied_data["external_memory"] = self.external_memory.model_copy(deep=True)
copied_data.pop("agents", None)
copied_data.pop("tasks", None)

View File

@@ -2,6 +2,7 @@ import asyncio
import copy
import inspect
import logging
import os
from typing import (
Any,
Callable,
@@ -32,6 +33,9 @@ from crewai.utilities.events.flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__)
@@ -465,7 +469,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Initialize state with initial values
self._state = self._create_initial_state()
if os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true":
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
# Apply any additional kwargs
if kwargs:
self._initialize_state(kwargs)

View File

@@ -28,7 +28,7 @@ from pydantic import (
InstanceOf,
PrivateAttr,
model_validator,
field_validator
field_validator,
)
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -210,7 +210,9 @@ class LiteAgent(FlowTrackable, BaseModel):
"""Set up the LLM and other components after initialization."""
self.llm = create_llm(self.llm)
if not isinstance(self.llm, BaseLLM):
raise ValueError(f"Expected LLM instance of type BaseLLM, got {type(self.llm).__name__}")
raise ValueError(
f"Expected LLM instance of type BaseLLM, got {type(self.llm).__name__}"
)
# Initialize callbacks
token_callback = TokenCalcHandler(token_cost_process=self._token_process)
@@ -233,7 +235,9 @@ class LiteAgent(FlowTrackable, BaseModel):
from crewai.tasks.llm_guardrail import LLMGuardrail
if not isinstance(self.llm, BaseLLM):
raise TypeError(f"Guardrail requires LLM instance of type BaseLLM, got {type(self.llm).__name__}")
raise TypeError(
f"Guardrail requires LLM instance of type BaseLLM, got {type(self.llm).__name__}"
)
self._guardrail = LLMGuardrail(description=self.guardrail, llm=self.llm)
@@ -515,7 +519,8 @@ class LiteAgent(FlowTrackable, BaseModel):
enforce_rpm_limit(self.request_within_rpm_limit)
# Emit LLM call started event
llm = cast(LLM, self.llm)
model = llm.model if hasattr(llm, "model") else "unknown"
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
@@ -523,6 +528,7 @@ class LiteAgent(FlowTrackable, BaseModel):
tools=None,
callbacks=self._callbacks,
from_agent=self,
model=model,
),
)
@@ -543,6 +549,7 @@ class LiteAgent(FlowTrackable, BaseModel):
response=answer,
call_type=LLMCallType.LLM_CALL,
from_agent=self,
model=model,
),
)
except Exception as e:

View File

@@ -61,6 +61,7 @@ load_dotenv()
litellm.suppress_debug_info = True
class FilteredStream(io.TextIOBase):
_lock = None
@@ -78,7 +79,8 @@ class FilteredStream(io.TextIOBase):
# Skip common noisy LiteLLM banners and any other lines that contain "litellm"
if (
"litellm.info:" in lower_s
or "Consider using a smaller input or implementing a text splitting strategy" in lower_s
or "Consider using a smaller input or implementing a text splitting strategy"
in lower_s
):
return 0
@@ -286,6 +288,8 @@ class AccumulatedToolArgs(BaseModel):
class LLM(BaseLLM):
completion_cost: Optional[float] = None
def __init__(
self,
model: str,
@@ -532,7 +536,11 @@ class LLM(BaseLLM):
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMStreamChunkEvent(chunk=chunk_content, from_task=from_task, from_agent=from_agent),
event=LLMStreamChunkEvent(
chunk=chunk_content,
from_task=from_task,
from_agent=from_agent,
),
)
# --- 4) Fallback to non-streaming if no content received
if not full_response.strip() and chunk_count == 0:
@@ -545,7 +553,11 @@ class LLM(BaseLLM):
"stream_options", None
) # Remove stream_options for non-streaming call
return self._handle_non_streaming_response(
non_streaming_params, callbacks, available_functions, from_task, from_agent
non_streaming_params,
callbacks,
available_functions,
from_task,
from_agent,
)
# --- 5) Handle empty response with chunks
@@ -630,7 +642,13 @@ class LLM(BaseLLM):
# Log token usage if available in streaming mode
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# Emit completion event and return response
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
self._handle_emit_call_events(
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return full_response
# --- 9) Handle tool calls if present
@@ -642,7 +660,13 @@ class LLM(BaseLLM):
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# --- 11) Emit completion event and return response
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
self._handle_emit_call_events(
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return full_response
except ContextWindowExceededError as e:
@@ -654,14 +678,22 @@ class LLM(BaseLLM):
logging.error(f"Error in streaming response: {str(e)}")
if full_response.strip():
logging.warning(f"Returning partial response despite error: {str(e)}")
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
self._handle_emit_call_events(
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return full_response
# Emit failed event and re-raise the exception
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e), from_task=from_task, from_agent=from_agent),
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise Exception(f"Failed to get streaming response: {str(e)}")
@@ -779,6 +811,7 @@ class LLM(BaseLLM):
# across the codebase. This allows CrewAgentExecutor to handle context
# length issues appropriately.
response = litellm.completion(**params)
except ContextWindowExceededError as e:
# Convert litellm's context window error to our own exception type
# for consistent handling in the rest of the codebase
@@ -805,7 +838,13 @@ class LLM(BaseLLM):
# --- 5) If no tool calls or no available functions, return the text response directly as long as there is a text response
if (not tool_calls or not available_functions) and text_response:
self._handle_emit_call_events(response=text_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
self._handle_emit_call_events(
response=text_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return text_response
# --- 6) If there is no text response, no available functions, but there are tool calls, return the tool calls
elif tool_calls and not available_functions and not text_response:
@@ -816,7 +855,13 @@ class LLM(BaseLLM):
if tool_result is not None:
return tool_result
# --- 8) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(response=text_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
self._handle_emit_call_events(
response=text_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return text_response
def _handle_tool_call(
@@ -873,7 +918,9 @@ class LLM(BaseLLM):
)
# --- 3.3) Emit success event
self._handle_emit_call_events(response=result, call_type=LLMCallType.TOOL_CALL)
self._handle_emit_call_events(
response=result, call_type=LLMCallType.TOOL_CALL
)
return result
except Exception as e:
# --- 3.4) Handle execution errors
@@ -891,7 +938,7 @@ class LLM(BaseLLM):
event=ToolUsageErrorEvent(
tool_name=function_name,
tool_args=function_args,
error=f"Tool execution error: {str(e)}"
error=f"Tool execution error: {str(e)}",
),
)
return None
@@ -941,6 +988,7 @@ class LLM(BaseLLM):
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
@@ -978,17 +1026,22 @@ class LLM(BaseLLM):
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(e) and "'stop'" in str(e)
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if "additional_drop_params" in self.additional_params and isinstance(self.additional_params["additional_drop_params"], list):
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
logging.info(
"Retrying LLM call without the unsupported 'stop'"
)
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
messages,
@@ -1002,11 +1055,20 @@ class LLM(BaseLLM):
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e), from_task=from_task, from_agent=from_agent),
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None, from_agent: Optional[Any] = None, messages: str | list[dict[str, Any]] | None = None):
def _handle_emit_call_events(
self,
response: Any,
call_type: LLMCallType,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
messages: str | list[dict[str, Any]] | None = None,
):
"""Handle the events for the LLM call.
Args:
@@ -1019,7 +1081,14 @@ class LLM(BaseLLM):
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(messages=messages, response=response, call_type=call_type, from_task=from_task, from_agent=from_agent),
event=LLMCallCompletedEvent(
messages=messages,
response=response,
call_type=call_type,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
def _format_messages_for_provider(
@@ -1074,11 +1143,13 @@ class LLM(BaseLLM):
# TODO: Remove this code after merging PR https://github.com/BerriAI/litellm/pull/10917
# Ollama doesn't supports last message to be 'assistant'
if "ollama" in self.model.lower() and messages and messages[-1]["role"] == "assistant":
if (
"ollama" in self.model.lower()
and messages
and messages[-1]["role"] == "assistant"
):
messages = messages.copy()
messages.append(
{"role": "user", "content": ""}
)
messages.append({"role": "user", "content": ""})
return messages
# Handle Anthropic models

View File

@@ -16,3 +16,4 @@ class _NotSpecified:
# Unlike `None`, which might be a valid value from the user, `NOT_SPECIFIED` allows
# us to distinguish between "not passed at all" and "explicitly passed None" or "[]".
NOT_SPECIFIED = _NotSpecified()
CREWAI_BASE_URL = "https://app.crewai.com/"

View File

@@ -47,6 +47,7 @@ class CrewKickoffCompletedEvent(CrewBaseEvent):
output: Any
type: str = "crew_kickoff_completed"
total_tokens: int = 0
class CrewKickoffFailedEvent(CrewBaseEvent):

View File

@@ -0,0 +1,33 @@
import json
from datetime import datetime
from crewai.cli.plus_api import PlusAPI
from crewai.cli.authentication.token import get_auth_token
from pydantic import BaseModel
from .trace_batch_manager import TraceBatch
from logging import getLogger
logger = getLogger(__name__)
class TraceSender(BaseModel):
"""Trace sender for sending trace batches to the backend"""
def send_batch(self, batch: TraceBatch) -> bool:
"""Print trace batch to console"""
try:
payload = batch.to_dict()
def datetime_handler(obj):
if isinstance(obj, datetime):
return obj.isoformat()
serialized_payload = json.loads(
json.dumps(payload, default=datetime_handler)
)
PlusAPI(api_key=get_auth_token()).send_trace_batch(serialized_payload)
return True
except Exception as e:
logger.error(f"Error sending trace batch: {e}")
return False

View File

@@ -0,0 +1,252 @@
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from crewai.utilities.constants import CREWAI_BASE_URL
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.cli.plus_api import PlusAPI
from rich.console import Console
from rich.panel import Panel
from crewai.utilities.events.listeners.tracing.types import TraceEvent
from logging import getLogger
logger = getLogger(__name__)
@dataclass
class TraceBatch:
"""Batch of events to send to backend"""
version: str = field(default_factory=get_crewai_version)
batch_id: str = field(default_factory=lambda: str(uuid.uuid4()))
user_context: Dict[str, str] = field(default_factory=dict)
execution_metadata: Dict[str, Any] = field(default_factory=dict)
events: List[TraceEvent] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return {
"version": self.version,
"batch_id": self.batch_id,
"user_context": self.user_context,
"execution_metadata": self.execution_metadata,
"events": [event.to_dict() for event in self.events],
}
class TraceBatchManager:
"""Single responsibility: Manage batches and event buffering"""
def __init__(self):
self.plus_api = PlusAPI(api_key=get_auth_token())
self.trace_batch_id: Optional[str] = None # Backend ID
self.current_batch: Optional[TraceBatch] = None
self.event_buffer: List[TraceEvent] = []
self.execution_start_times: Dict[str, datetime] = {}
def initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
) -> TraceBatch:
"""Initialize a new trace batch"""
self.current_batch = TraceBatch(
user_context=user_context, execution_metadata=execution_metadata
)
self.event_buffer.clear()
self.record_start_time("execution")
self._initialize_backend_batch(user_context, execution_metadata)
return self.current_batch
def _initialize_backend_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
):
"""Send batch initialization to backend"""
if not self.plus_api or not self.current_batch:
return
try:
payload = {
"trace_id": self.current_batch.batch_id,
"execution_type": execution_metadata.get("execution_type", "crew"),
"execution_context": {
"crew_fingerprint": execution_metadata.get("crew_fingerprint"),
"crew_name": execution_metadata.get("crew_name", "Unknown Crew"),
"flow_name": execution_metadata.get("flow_name", "Unknown Flow"),
"crewai_version": self.current_batch.version,
"privacy_level": user_context.get("privacy_level", "standard"),
},
"execution_metadata": {
"expected_duration_estimate": execution_metadata.get(
"expected_duration_estimate", 300
),
"agent_count": execution_metadata.get("agent_count", 0),
"task_count": execution_metadata.get("task_count", 0),
"flow_method_count": execution_metadata.get("flow_method_count", 0),
"execution_started_at": datetime.now(timezone.utc).isoformat(),
},
}
response = self.plus_api.initialize_trace_batch(payload)
if response.status_code == 201 or response.status_code == 200:
response_data = response.json()
self.trace_batch_id = response_data["trace_id"]
console = Console()
panel = Panel(
f"✅ Trace batch initialized with session ID: {self.trace_batch_id}",
title="Trace Batch Initialization",
border_style="green",
)
console.print(panel)
else:
logger.error(
f"❌ Failed to initialize trace batch: {response.status_code} - {response.text}"
)
except Exception as e:
logger.error(f"❌ Error initializing trace batch: {str(e)}")
def add_event(self, trace_event: TraceEvent):
"""Add event to buffer"""
self.event_buffer.append(trace_event)
def _send_events_to_backend(self):
"""Send buffered events to backend"""
if not self.plus_api or not self.trace_batch_id or not self.event_buffer:
return
try:
payload = {
"events": [event.to_dict() for event in self.event_buffer],
"batch_metadata": {
"events_count": len(self.event_buffer),
"batch_sequence": 1,
"is_final_batch": False,
},
}
if not self.trace_batch_id:
raise Exception("❌ Trace batch ID not found")
response = self.plus_api.send_trace_events(self.trace_batch_id, payload)
if response.status_code == 200 or response.status_code == 201:
self.event_buffer.clear()
else:
logger.error(
f"❌ Failed to send events: {response.status_code} - {response.text}"
)
except Exception as e:
logger.error(f"❌ Error sending events to backend: {str(e)}")
def finalize_batch(self) -> Optional[TraceBatch]:
"""Finalize batch and return it for sending"""
if not self.current_batch:
return None
if self.event_buffer:
self._send_events_to_backend()
self._finalize_backend_batch()
self.current_batch.events = self.event_buffer.copy()
finalized_batch = self.current_batch
self.current_batch = None
self.event_buffer.clear()
self.trace_batch_id = None
self._cleanup_batch_data()
return finalized_batch
def _finalize_backend_batch(self):
"""Send batch finalization to backend"""
if not self.plus_api or not self.trace_batch_id:
return
try:
total_events = len(self.current_batch.events) if self.current_batch else 0
payload = {
"status": "completed",
"duration_ms": self.calculate_duration("execution"),
"final_event_count": total_events,
}
response = self.plus_api.finalize_trace_batch(self.trace_batch_id, payload)
if response.status_code == 200:
console = Console()
panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}",
title="Trace Batch Finalization",
border_style="green",
)
console.print(panel)
else:
logger.error(
f"❌ Failed to finalize trace batch: {response.status_code} - {response.text}"
)
except Exception as e:
logger.error(f"❌ Error finalizing trace batch: {str(e)}")
# TODO: send error to app
def _cleanup_batch_data(self):
"""Clean up batch data after successful finalization to free memory"""
try:
if hasattr(self, "event_buffer") and self.event_buffer:
self.event_buffer.clear()
if hasattr(self, "current_batch") and self.current_batch:
if hasattr(self.current_batch, "events") and self.current_batch.events:
self.current_batch.events.clear()
self.current_batch = None
if hasattr(self, "batch_sequence"):
self.batch_sequence = 0
except Exception as e:
logger.error(f"Warning: Error during cleanup: {str(e)}")
def has_events(self) -> bool:
"""Check if there are events in the buffer"""
return len(self.event_buffer) > 0
def get_event_count(self) -> int:
"""Get number of events in buffer"""
return len(self.event_buffer)
def is_batch_initialized(self) -> bool:
"""Check if batch is initialized"""
return self.current_batch is not None
def record_start_time(self, key: str):
"""Record start time for duration calculation"""
self.execution_start_times[key] = datetime.now(timezone.utc)
def calculate_duration(self, key: str) -> int:
"""Calculate duration in milliseconds from recorded start time"""
start_time = self.execution_start_times.get(key)
if start_time:
duration_ms = int(
(datetime.now(timezone.utc) - start_time).total_seconds() * 1000
)
del self.execution_start_times[key]
return duration_ms
return 0
def get_trace_id(self) -> Optional[str]:
"""Get current trace ID"""
if self.current_batch:
return self.current_batch.user_context.get("trace_id")
return None

View File

@@ -0,0 +1,414 @@
import os
import uuid
from typing import Dict, Any, Optional
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
AgentExecutionErrorEvent,
)
from crewai.utilities.events.listeners.tracing.types import TraceEvent
from crewai.utilities.events.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
from crewai.utilities.events.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
)
from crewai.utilities.events.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from crewai.utilities.events.flow_events import (
FlowCreatedEvent,
FlowStartedEvent,
FlowFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
FlowPlotEvent,
)
from crewai.utilities.events.llm_guardrail_events import (
LLMGuardrailStartedEvent,
LLMGuardrailCompletedEvent,
)
from crewai.utilities.serialization import to_serializable
from .trace_batch_manager import TraceBatchManager
from crewai.utilities.events.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
from .interfaces import TraceSender
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.version import get_crewai_version
class TraceCollectionListener(BaseEventListener):
"""
Trace collection listener that orchestrates trace collection
"""
trace_enabled: bool = False
complex_events = ["task_started", "llm_call_started", "llm_call_completed"]
_instance = None
_initialized = False
def __new__(cls, batch_manager=None, trace_sender=None):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(
self,
batch_manager: Optional[TraceBatchManager] = None,
trace_sender: Optional[TraceSender] = None,
):
if self._initialized:
return
super().__init__()
self.batch_manager = batch_manager or TraceBatchManager()
self.trace_sender = trace_sender or TraceSender()
self.trace_enabled = self._check_trace_enabled()
self._initialized = True
def _check_trace_enabled(self) -> bool:
"""Check if tracing should be enabled"""
auth_token = get_auth_token()
if not auth_token:
return False
return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true" or bool(
os.getenv("CREWAI_USER_TOKEN")
)
def _get_user_context(self) -> Dict[str, str]:
"""Extract user context for tracing"""
return {
"user_id": os.getenv("CREWAI_USER_ID", "anonymous"),
"organization_id": os.getenv("CREWAI_ORG_ID", ""),
"session_id": str(uuid.uuid4()),
"trace_id": str(uuid.uuid4()),
}
def setup_listeners(self, crewai_event_bus):
"""Setup event listeners - delegates to specific handlers"""
if not self.trace_enabled:
return
self._register_flow_event_handlers(crewai_event_bus)
self._register_context_event_handlers(crewai_event_bus)
self._register_action_event_handlers(crewai_event_bus)
def _register_flow_event_handlers(self, event_bus):
"""Register handlers for flow events"""
@event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event):
pass
@event_bus.on(FlowStartedEvent)
def on_flow_started(source, event):
if not self.batch_manager.is_batch_initialized():
self._initialize_flow_batch(source, event)
self._handle_trace_event("flow_started", source, event)
@event_bus.on(MethodExecutionStartedEvent)
def on_method_started(source, event):
self._handle_trace_event("method_execution_started", source, event)
@event_bus.on(MethodExecutionFinishedEvent)
def on_method_finished(source, event):
self._handle_trace_event("method_execution_finished", source, event)
@event_bus.on(MethodExecutionFailedEvent)
def on_method_failed(source, event):
self._handle_trace_event("method_execution_failed", source, event)
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event):
self._handle_trace_event("flow_finished", source, event)
self._send_batch()
@event_bus.on(FlowPlotEvent)
def on_flow_plot(source, event):
self._handle_action_event("flow_plot", source, event)
def _register_context_event_handlers(self, event_bus):
"""Register handlers for context events (start/end)"""
@event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event):
if not self.batch_manager.is_batch_initialized():
self._initialize_batch(source, event)
self._handle_trace_event("crew_kickoff_started", source, event)
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
self._handle_trace_event("crew_kickoff_completed", source, event)
self._send_batch()
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event):
self._handle_trace_event("crew_kickoff_failed", source, event)
self._send_batch()
@event_bus.on(TaskStartedEvent)
def on_task_started(source, event):
self._handle_trace_event("task_started", source, event)
@event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event):
self._handle_trace_event("task_completed", source, event)
@event_bus.on(TaskFailedEvent)
def on_task_failed(source, event):
self._handle_trace_event("task_failed", source, event)
@event_bus.on(AgentExecutionStartedEvent)
def on_agent_started(source, event):
self._handle_trace_event("agent_execution_started", source, event)
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_completed(source, event):
self._handle_trace_event("agent_execution_completed", source, event)
@event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_started(source, event):
self._handle_trace_event("lite_agent_execution_started", source, event)
@event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_completed(source, event):
self._handle_trace_event("lite_agent_execution_completed", source, event)
@event_bus.on(LiteAgentExecutionErrorEvent)
def on_lite_agent_error(source, event):
self._handle_trace_event("lite_agent_execution_error", source, event)
@event_bus.on(AgentExecutionErrorEvent)
def on_agent_error(source, event):
self._handle_trace_event("agent_execution_error", source, event)
@event_bus.on(LLMGuardrailStartedEvent)
def on_guardrail_started(source, event):
self._handle_trace_event("llm_guardrail_started", source, event)
@event_bus.on(LLMGuardrailCompletedEvent)
def on_guardrail_completed(source, event):
self._handle_trace_event("llm_guardrail_completed", source, event)
def _register_action_event_handlers(self, event_bus):
"""Register handlers for action events (LLM calls, tool usage, memory)"""
@event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event):
self._handle_action_event("llm_call_started", source, event)
@event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event):
self._handle_action_event("llm_call_completed", source, event)
@event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event):
self._handle_action_event("llm_call_failed", source, event)
@event_bus.on(ToolUsageStartedEvent)
def on_tool_started(source, event):
self._handle_action_event("tool_usage_started", source, event)
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_finished(source, event):
self._handle_action_event("tool_usage_finished", source, event)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_error(source, event):
self._handle_action_event("tool_usage_error", source, event)
@event_bus.on(MemoryQueryStartedEvent)
def on_memory_query_started(source, event):
self._handle_action_event("memory_query_started", source, event)
@event_bus.on(MemoryQueryCompletedEvent)
def on_memory_query_completed(source, event):
self._handle_action_event("memory_query_completed", source, event)
@event_bus.on(MemoryQueryFailedEvent)
def on_memory_query_failed(source, event):
self._handle_action_event("memory_query_failed", source, event)
@event_bus.on(MemorySaveStartedEvent)
def on_memory_save_started(source, event):
self._handle_action_event("memory_save_started", source, event)
@event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(source, event):
self._handle_action_event("memory_save_completed", source, event)
@event_bus.on(MemorySaveFailedEvent)
def on_memory_save_failed(source, event):
self._handle_action_event("memory_save_failed", source, event)
@event_bus.on(AgentReasoningStartedEvent)
def on_agent_reasoning_started(source, event):
self._handle_action_event("agent_reasoning_started", source, event)
@event_bus.on(AgentReasoningCompletedEvent)
def on_agent_reasoning_completed(source, event):
self._handle_action_event("agent_reasoning_completed", source, event)
@event_bus.on(AgentReasoningFailedEvent)
def on_agent_reasoning_failed(source, event):
self._handle_action_event("agent_reasoning_failed", source, event)
def _initialize_batch(self, source: Any, event: Any):
"""Initialize trace batch"""
user_context = self._get_user_context()
execution_metadata = {
"crew_name": getattr(event, "crew_name", "Unknown Crew"),
"execution_start": event.timestamp if hasattr(event, "timestamp") else None,
"crewai_version": get_crewai_version(),
}
self.batch_manager.initialize_batch(user_context, execution_metadata)
def _initialize_flow_batch(self, source: Any, event: Any):
"""Initialize trace batch for Flow execution"""
user_context = self._get_user_context()
execution_metadata = {
"flow_name": getattr(source, "__class__.__name__", "Unknown Flow"),
"execution_start": event.timestamp if hasattr(event, "timestamp") else None,
"crewai_version": get_crewai_version(),
"execution_type": "flow",
}
self.batch_manager.initialize_batch(user_context, execution_metadata)
def _handle_trace_event(self, event_type: str, source: Any, event: Any):
"""Generic handler for context end events"""
trace_event = self._create_trace_event(event_type, source, event)
self.batch_manager.add_event(trace_event)
def _handle_action_event(self, event_type: str, source: Any, event: Any):
"""Generic handler for action events (LLM calls, tool usage)"""
if not self.batch_manager.is_batch_initialized():
user_context = self._get_user_context()
execution_metadata = {
"crew_name": getattr(source, "name", "Unknown Crew"),
"crewai_version": get_crewai_version(),
}
self.batch_manager.initialize_batch(user_context, execution_metadata)
trace_event = self._create_trace_event(event_type, source, event)
self.batch_manager.add_event(trace_event)
def _send_batch(self):
"""Send finalized batch using the configured sender"""
batch = self.batch_manager.finalize_batch()
if batch:
success = self.trace_sender.send_batch(batch)
if not success:
print("⚠️ Failed to send trace batch")
def _create_trace_event(
self, event_type: str, source: Any, event: Any
) -> TraceEvent:
"""Create a trace event"""
trace_event = TraceEvent(
type=event_type,
)
trace_event.event_data = self._build_event_data(event_type, event, source)
return trace_event
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> Dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return self._safe_serialize_to_dict(event)
elif event_type == "task_started":
return {
"task_description": event.task.description,
"task_name": event.task.name,
"context": event.context,
"agent": source.agent.role,
}
elif event_type == "llm_call_started":
return {
**self._safe_serialize_to_dict(event),
"messages": self._truncate_messages(event.messages),
}
elif event_type == "llm_call_completed":
return {
**self._safe_serialize_to_dict(event),
"messages": self._truncate_messages(event.messages),
}
else:
return {
"event_type": event_type,
"event": self._safe_serialize_to_dict(event),
"source": source,
}
# TODO: move to utils
def _safe_serialize_to_dict(
self, obj, exclude: set[str] | None = None
) -> Dict[str, Any]:
"""Safely serialize an object to a dictionary for event data."""
try:
serialized = to_serializable(obj, exclude)
if isinstance(serialized, dict):
return serialized
else:
return {"serialized_data": serialized}
except Exception as e:
return {"serialization_error": str(e), "object_type": type(obj).__name__}
# TODO: move to utils
def _truncate_messages(self, messages, max_content_length=200, max_messages=5):
"""Truncate message content and limit number of messages"""
if not messages or not isinstance(messages, list):
return messages
# Limit number of messages
limited_messages = messages[:max_messages]
# Truncate each message content
for msg in limited_messages:
if isinstance(msg, dict) and "content" in msg:
content = msg["content"]
if len(content) > max_content_length:
msg["content"] = content[:max_content_length] + "..."
return limited_messages

View File

@@ -0,0 +1,19 @@
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Dict, Any
import uuid
@dataclass
class TraceEvent:
"""Individual trace event payload"""
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
type: str = ""
event_data: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)

View File

@@ -5,6 +5,7 @@ from pydantic import BaseModel
from crewai.utilities.events.base_events import BaseEvent
class LLMEventBase(BaseEvent):
task_name: Optional[str] = None
task_id: Optional[str] = None
@@ -32,6 +33,7 @@ class LLMEventBase(BaseEvent):
self.task_id = task.id
self.task_name = task.name
class LLMCallType(Enum):
"""Type of LLM call being made"""
@@ -48,6 +50,7 @@ class LLMCallStartedEvent(LLMEventBase):
"""
type: str = "llm_call_started"
model: Optional[str] = None
messages: Optional[Union[str, List[Dict[str, Any]]]] = None
tools: Optional[List[dict[str, Any]]] = None
callbacks: Optional[List[Any]] = None
@@ -61,6 +64,8 @@ class LLMCallCompletedEvent(LLMEventBase):
messages: str | list[dict[str, Any]] | None = None
response: Any
call_type: LLMCallType
model: Optional[str] = None
class LLMCallFailedEvent(LLMEventBase):
"""Event emitted when a LLM call fails"""