fix: reduce trace event serialization bloat

Use context-aware field serializers so event models control their own
trace representation. Heavy nested objects become lightweight refs.
This commit is contained in:
Greyson LaLonde
2026-03-31 15:40:25 +08:00
parent dfc0f9a317
commit 6da1c5f964
9 changed files with 856 additions and 31 deletions

View File

@@ -5,11 +5,43 @@ import itertools
from typing import Any
import uuid
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, SerializationInfo
from crewai.utilities.serialization import Serializable, to_serializable
def _is_trace_context(info: SerializationInfo) -> bool:
"""Check if serialization is happening in trace context."""
return bool(info.context and info.context.get("trace"))
def _trace_agent_ref(agent: Any) -> dict[str, Any] | None:
"""Return a lightweight agent reference for trace serialization."""
if agent is None:
return None
return {
"id": str(getattr(agent, "id", "")),
"role": getattr(agent, "role", ""),
}
def _trace_task_ref(task: Any) -> dict[str, Any] | None:
"""Return a lightweight task reference for trace serialization."""
if task is None:
return None
return {
"id": str(getattr(task, "id", "")),
"name": getattr(task, "name", None) or getattr(task, "description", ""),
}
def _trace_tool_names(tools: Any) -> list[str] | None:
"""Return a list of tool names for trace serialization."""
if not tools:
return None
return [getattr(t, "name", str(t)) for t in tools]
_emission_counter: contextvars.ContextVar[Iterator[int]] = contextvars.ContextVar(
"_emission_counter"
)

View File

@@ -1,7 +1,7 @@
"""Trace collection listener for orchestrating trace collection."""
import os
from typing import Any, ClassVar
from typing import Any
import uuid
from typing_extensions import Self
@@ -126,18 +126,13 @@ from crewai.events.types.tool_usage_events import (
from crewai.events.utils.console_formatter import ConsoleFormatter
_TRACE_CONTEXT: dict[str, bool] = {"trace": True}
"""Serialization context that triggers lightweight field serializers on event models."""
class TraceCollectionListener(BaseEventListener):
"""Trace collection listener that orchestrates trace collection."""
complex_events: ClassVar[list[str]] = [
"task_started",
"task_completed",
"llm_call_started",
"llm_call_completed",
"agent_execution_started",
"agent_execution_completed",
]
_instance: Self | None = None
_initialized: bool = False
_listeners_setup: bool = False
@@ -810,9 +805,20 @@ class TraceCollectionListener(BaseEventListener):
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return safe_serialize_to_dict(event)
"""Build event data with context-based serialization to reduce trace bloat.
Field serializers on event models check for context={"trace": True} and
return lightweight references instead of full nested objects. This replaces
the old denylist approach with Pydantic v2's native context mechanism.
Only crew_kickoff_started gets a full crew structure (built separately).
Complex events (task_started, etc.) use custom projections for specific shapes.
All other events get context-aware serialization automatically.
"""
if event_type == "crew_kickoff_started":
return self._build_crew_started_data(event)
# Complex events with custom projections (specific field shapes for AMP)
if event_type == "task_started":
task_name = event.task.name or event.task.description
task_display_name = (
@@ -853,19 +859,78 @@ class TraceCollectionListener(BaseEventListener):
"agent_backstory": event.agent.backstory,
}
if event_type == "llm_call_started":
event_data = safe_serialize_to_dict(event)
event_data = safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
event_data["task_name"] = event.task_name or getattr(
event, "task_description", None
)
return event_data
if event_type == "llm_call_completed":
return safe_serialize_to_dict(event)
return safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
return {
"event_type": event_type,
"event": safe_serialize_to_dict(event),
"source": source,
}
# All other events: field serializers handle the heavy lifting
return safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
def _build_crew_started_data(self, event: Any) -> dict[str, Any]:
"""Build comprehensive crew structure for crew_kickoff_started event.
This is the ONE place where we serialize the full crew structure.
Subsequent events use lightweight references via field serializers.
"""
event_data = safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
crew = getattr(event, "crew", None)
if crew is not None:
agents_data = []
for agent in getattr(crew, "agents", []) or []:
agent_data = {
"id": str(getattr(agent, "id", "")),
"role": getattr(agent, "role", ""),
"goal": getattr(agent, "goal", ""),
"backstory": getattr(agent, "backstory", ""),
"verbose": getattr(agent, "verbose", False),
"allow_delegation": getattr(agent, "allow_delegation", False),
"max_iter": getattr(agent, "max_iter", None),
"max_rpm": getattr(agent, "max_rpm", None),
}
tools = getattr(agent, "tools", None)
if tools:
agent_data["tool_names"] = [
getattr(t, "name", str(t)) for t in tools
]
agents_data.append(agent_data)
tasks_data = []
for task in getattr(crew, "tasks", []) or []:
task_data = {
"id": str(getattr(task, "id", "")),
"name": getattr(task, "name", None),
"description": getattr(task, "description", ""),
"expected_output": getattr(task, "expected_output", ""),
"async_execution": getattr(task, "async_execution", False),
"human_input": getattr(task, "human_input", False),
}
task_agent = getattr(task, "agent", None)
if task_agent:
task_data["agent_ref"] = {
"id": str(getattr(task_agent, "id", "")),
"role": getattr(task_agent, "role", ""),
}
context_tasks = getattr(task, "context", None)
if context_tasks:
task_data["context_task_ids"] = [
str(getattr(ct, "id", "")) for ct in context_tasks
]
tasks_data.append(task_data)
event_data["crew_structure"] = {
"agents": agents_data,
"tasks": tasks_data,
"process": str(getattr(crew, "process", "")),
"verbose": getattr(crew, "verbose", False),
"memory": getattr(crew, "memory", False),
}
return event_data
def _show_tracing_disabled_message(self) -> None:
"""Show a message when tracing is disabled."""

View File

@@ -429,10 +429,22 @@ def mark_first_execution_done(user_consented: bool = False) -> None:
p.write_text(json.dumps(data, indent=2))
def safe_serialize_to_dict(obj: Any, exclude: set[str] | None = None) -> dict[str, Any]:
"""Safely serialize an object to a dictionary for event data."""
def safe_serialize_to_dict(
obj: Any,
exclude: set[str] | None = None,
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Safely serialize an object to a dictionary for event data.
Args:
obj: Object to serialize.
exclude: Set of keys to exclude from the result.
context: Optional context dict passed through to Pydantic's model_dump().
Field serializers can inspect this to customize output
(e.g. context={"trace": True} for lightweight trace serialization).
"""
try:
serialized = to_serializable(obj, exclude)
serialized = to_serializable(obj, exclude, context=context)
if isinstance(serialized, dict):
return serialized
return {"serialized_data": serialized}

View File

@@ -5,11 +5,17 @@ from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from pydantic import ConfigDict, model_validator
from pydantic import ConfigDict, SerializationInfo, field_serializer, model_validator
from typing_extensions import Self
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.base_events import BaseEvent
from crewai.events.base_events import (
BaseEvent,
_is_trace_context,
_trace_agent_ref,
_trace_task_ref,
_trace_tool_names,
)
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
@@ -31,6 +37,21 @@ class AgentExecutionStartedEvent(BaseEvent):
_set_agent_fingerprint(self, self.agent)
return self
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
@field_serializer("tools")
@classmethod
def _serialize_tools(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_tool_names(v) if _is_trace_context(info) else v
class AgentExecutionCompletedEvent(BaseEvent):
"""Event emitted when an agent completes executing a task"""
@@ -48,6 +69,16 @@ class AgentExecutionCompletedEvent(BaseEvent):
_set_agent_fingerprint(self, self.agent)
return self
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class AgentExecutionErrorEvent(BaseEvent):
"""Event emitted when an agent encounters an error during execution"""
@@ -65,6 +96,16 @@ class AgentExecutionErrorEvent(BaseEvent):
_set_agent_fingerprint(self, self.agent)
return self
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
# New event classes for LiteAgent
class LiteAgentExecutionStartedEvent(BaseEvent):
@@ -77,6 +118,11 @@ class LiteAgentExecutionStartedEvent(BaseEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_serializer("tools")
@classmethod
def _serialize_tools(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_tool_names(v) if _is_trace_context(info) else v
class LiteAgentExecutionCompletedEvent(BaseEvent):
"""Event emitted when a LiteAgent completes execution"""

View File

@@ -1,6 +1,8 @@
from typing import TYPE_CHECKING, Any
from crewai.events.base_events import BaseEvent
from pydantic import SerializationInfo, field_serializer
from crewai.events.base_events import BaseEvent, _is_trace_context
if TYPE_CHECKING:
@@ -26,6 +28,14 @@ class CrewBaseEvent(BaseEvent):
if self.crew.fingerprint.metadata:
self.fingerprint_metadata = self.crew.fingerprint.metadata
@field_serializer("crew")
@classmethod
def _serialize_crew(cls, v: Any, info: SerializationInfo) -> Any:
"""Exclude crew in trace context — crew_kickoff_started builds structure separately."""
if _is_trace_context(info):
return None
return v
def to_json(self, exclude: set[str] | None = None) -> Any:
if exclude is None:
exclude = set()

View File

@@ -1,9 +1,9 @@
from enum import Enum
from typing import Any
from pydantic import BaseModel
from pydantic import BaseModel, SerializationInfo, field_serializer
from crewai.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent, _is_trace_context
class LLMEventBase(BaseEvent):
@@ -49,6 +49,16 @@ class LLMCallStartedEvent(LLMEventBase):
callbacks: list[Any] | None = None
available_functions: dict[str, Any] | None = None
@field_serializer("callbacks")
@classmethod
def _serialize_callbacks(cls, v: Any, info: SerializationInfo) -> Any:
return None if _is_trace_context(info) else v
@field_serializer("available_functions")
@classmethod
def _serialize_available_functions(cls, v: Any, info: SerializationInfo) -> Any:
return None if _is_trace_context(info) else v
class LLMCallCompletedEvent(LLMEventBase):
"""Event emitted when a LLM call completes"""

View File

@@ -1,6 +1,8 @@
from typing import Any
from crewai.events.base_events import BaseEvent
from pydantic import SerializationInfo, field_serializer
from crewai.events.base_events import BaseEvent, _is_trace_context, _trace_task_ref
from crewai.tasks.task_output import TaskOutput
@@ -24,6 +26,11 @@ class TaskStartedEvent(BaseEvent):
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class TaskCompletedEvent(BaseEvent):
"""Event emitted when a task completes"""
@@ -36,6 +43,11 @@ class TaskCompletedEvent(BaseEvent):
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class TaskFailedEvent(BaseEvent):
"""Event emitted when a task fails"""
@@ -48,6 +60,11 @@ class TaskFailedEvent(BaseEvent):
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class TaskEvaluationEvent(BaseEvent):
"""Event emitted when a task evaluation is completed"""
@@ -59,3 +76,8 @@ class TaskEvaluationEvent(BaseEvent):
def __init__(self, **data: Any) -> None:
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v

View File

@@ -20,6 +20,7 @@ def to_serializable(
max_depth: int = 5,
_current_depth: int = 0,
_ancestors: set[int] | None = None,
context: dict[str, Any] | None = None,
) -> Serializable:
"""Converts a Python object into a JSON-compatible representation.
@@ -33,6 +34,9 @@ def to_serializable(
max_depth: Maximum recursion depth. Defaults to 5.
_current_depth: Current recursion depth (for internal use).
_ancestors: Set of ancestor object ids for cycle detection (for internal use).
context: Optional context dict passed to Pydantic's model_dump(context=...).
Field serializers on the model can inspect this to customize output
(e.g. context={"trace": True} for lightweight trace serialization).
Returns:
Serializable: A JSON-compatible structure.
@@ -66,6 +70,7 @@ def to_serializable(
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
for item in obj
]
@@ -77,17 +82,24 @@ def to_serializable(
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
for key, value in obj.items()
if key not in exclude
}
if isinstance(obj, BaseModel):
try:
dump_kwargs: dict[str, Any] = {}
if exclude:
dump_kwargs["exclude"] = exclude
if context is not None:
dump_kwargs["context"] = context
return to_serializable(
obj=obj.model_dump(exclude=exclude),
obj=obj.model_dump(**dump_kwargs),
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
except Exception:
try:
@@ -97,12 +109,15 @@ def to_serializable(
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
for k, v in obj.__dict__.items()
if k not in (exclude or set())
}
except Exception:
return repr(obj)
if callable(obj):
return repr(obj)
return repr(obj)

View File

@@ -0,0 +1,613 @@
"""Tests for trace serialization optimization using Pydantic v2 context-based serialization.
These tests verify that trace events use @field_serializer with SerializationInfo.context
to produce lightweight representations, reducing event sizes from 50-100KB to a few KB.
"""
import json
import uuid
from typing import Any
from unittest.mock import MagicMock
import pytest
from pydantic import ConfigDict
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.base_events import _trace_agent_ref, _trace_task_ref, _trace_tool_names
from crewai.events.listeners.tracing.utils import safe_serialize_to_dict
from crewai.security import Fingerprint, SecurityConfig
from crewai.utilities.serialization import to_serializable
# ---------------------------------------------------------------------------
# Lightweight BaseAgent subclass for tests (avoids heavy dependencies)
# ---------------------------------------------------------------------------
class _StubAgent(BaseAgent):
"""Minimal BaseAgent subclass that satisfies validation without heavy deps."""
model_config = ConfigDict(arbitrary_types_allowed=True)
def execute_task(self, *a: Any, **kw: Any) -> str:
return ""
def create_agent_executor(self, *a: Any, **kw: Any) -> None:
pass
def _parse_tools(self, *a: Any, **kw: Any) -> list:
return []
def get_delegation_tools(self, *a: Any, **kw: Any) -> list:
return []
def get_output_converter(self, *a: Any, **kw: Any) -> Any:
return None
def get_multimodal_tools(self, *a: Any, **kw: Any) -> list:
return []
async def aexecute_task(self, *a: Any, **kw: Any) -> str:
return ""
def get_mcp_tools(self, *a: Any, **kw: Any) -> list:
return []
def get_platform_tools(self, *a: Any, **kw: Any) -> list:
return []
def _make_stub_agent(**overrides) -> _StubAgent:
"""Create a minimal BaseAgent instance for testing."""
defaults = {
"role": "Researcher",
"goal": "Research things",
"backstory": "Expert researcher",
"tools": [],
}
defaults.update(overrides)
return _StubAgent(**defaults)
# ---------------------------------------------------------------------------
# Helpers to build realistic mock objects for event fields
# ---------------------------------------------------------------------------
def _make_mock_task(**overrides):
task = MagicMock()
task.id = overrides.get("id", uuid.uuid4())
task.name = overrides.get("name", "Research Task")
task.description = overrides.get("description", "Do research")
task.expected_output = overrides.get("expected_output", "Research results")
task.async_execution = overrides.get("async_execution", False)
task.human_input = overrides.get("human_input", False)
task.agent = overrides.get("agent", _make_stub_agent())
task.context = overrides.get("context", None)
task.crew = MagicMock()
task.tools = overrides.get("tools", [MagicMock(), MagicMock()])
fp = MagicMock()
fp.uuid_str = str(uuid.uuid4())
fp.metadata = {"name": task.name}
task.fingerprint = fp
return task
def _make_stub_tool(tool_name="web_search") -> Any:
"""Create a minimal BaseTool instance for testing."""
from crewai.tools.base_tool import BaseTool
class _StubTool(BaseTool):
name: str = "stub"
description: str = "stub tool"
def _run(self, *a: Any, **kw: Any) -> str:
return ""
return _StubTool(name=tool_name, description=f"{tool_name} tool")
# ---------------------------------------------------------------------------
# Unit tests: trace ref helpers
# ---------------------------------------------------------------------------
class TestTraceRefHelpers:
def test_trace_agent_ref(self):
agent = _make_stub_agent(role="Analyst")
ref = _trace_agent_ref(agent)
assert ref["role"] == "Analyst"
assert "id" in ref
assert len(ref) == 2 # only id and role
def test_trace_agent_ref_none(self):
assert _trace_agent_ref(None) is None
def test_trace_task_ref(self):
task = _make_mock_task(name="Write Report")
ref = _trace_task_ref(task)
assert ref["name"] == "Write Report"
assert "id" in ref
assert len(ref) == 2
def test_trace_task_ref_falls_back_to_description(self):
task = _make_mock_task(name=None, description="Describe the report")
ref = _trace_task_ref(task)
assert ref["name"] == "Describe the report"
def test_trace_task_ref_none(self):
assert _trace_task_ref(None) is None
def test_trace_tool_names(self):
tools = [_make_stub_tool("search"), _make_stub_tool("read")]
names = _trace_tool_names(tools)
assert names == ["search", "read"]
def test_trace_tool_names_empty(self):
assert _trace_tool_names([]) is None
assert _trace_tool_names(None) is None
# ---------------------------------------------------------------------------
# Integration tests: field serializers on real event classes
# ---------------------------------------------------------------------------
class TestAgentEventFieldSerializers:
"""Test that agent event field serializers respond to trace context."""
def test_agent_execution_started_trace_context(self):
from crewai.events.types.agent_events import AgentExecutionStartedEvent
agent = _make_stub_agent(role="Researcher")
task = _make_mock_task(name="Research Task")
tools = [_make_stub_tool("search"), _make_stub_tool("read")]
event = AgentExecutionStartedEvent(
agent=agent, task=task, tools=tools, task_prompt="Do research"
)
# With trace context: lightweight refs
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["agent"] == {"id": str(agent.id), "role": "Researcher"}
assert trace_dump["task"] == {"id": str(task.id), "name": "Research Task"}
assert trace_dump["tools"] == ["search", "read"]
def test_agent_execution_started_no_context(self):
from crewai.events.types.agent_events import AgentExecutionStartedEvent
agent = _make_stub_agent(role="SpecificRole")
task = _make_mock_task()
event = AgentExecutionStartedEvent(
agent=agent, task=task, tools=None, task_prompt="Do research"
)
# Without context: full agent dict (Pydantic model_dump expands it)
normal_dump = event.model_dump()
assert isinstance(normal_dump["agent"], dict)
assert normal_dump["agent"]["role"] == "SpecificRole"
# Should have ALL agent fields, not just the lightweight ref
assert "goal" in normal_dump["agent"]
assert "backstory" in normal_dump["agent"]
assert "max_iter" in normal_dump["agent"]
def test_agent_execution_error_preserves_identification(self):
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Analyst")
task = _make_mock_task(name="Analysis Task")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="Something went wrong"
)
trace_dump = event.model_dump(context={"trace": True})
# Error events should still have agent/task identification as refs
assert trace_dump["agent"]["role"] == "Analyst"
assert trace_dump["task"]["name"] == "Analysis Task"
assert trace_dump["error"] == "Something went wrong"
def test_agent_execution_completed_trace_context(self):
from crewai.events.types.agent_events import AgentExecutionCompletedEvent
agent = _make_stub_agent(role="Writer")
task = _make_mock_task(name="Writing Task")
event = AgentExecutionCompletedEvent(
agent=agent, task=task, output="Final output"
)
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["agent"]["role"] == "Writer"
assert trace_dump["task"]["name"] == "Writing Task"
assert trace_dump["output"] == "Final output"
class TestTaskEventFieldSerializers:
"""Test that task event field serializers respond to trace context."""
def test_task_started_trace_context(self):
from crewai.events.types.task_events import TaskStartedEvent
task = _make_mock_task(name="Test Task")
event = TaskStartedEvent(task=task, context="some context")
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["task"] == {"id": str(task.id), "name": "Test Task"}
assert trace_dump["context"] == "some context"
def test_task_failed_trace_context(self):
from crewai.events.types.task_events import TaskFailedEvent
task = _make_mock_task(name="Failing Task")
event = TaskFailedEvent(task=task, error="Task failed")
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["task"]["name"] == "Failing Task"
assert trace_dump["error"] == "Task failed"
class TestCrewEventFieldSerializers:
"""Test that crew event field serializers respond to trace context."""
def test_crew_kickoff_started_excludes_crew_in_trace(self):
from crewai.events.types.crew_events import CrewKickoffStartedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs={"key": "value"}
)
trace_dump = event.model_dump(context={"trace": True})
# crew field should be None in trace context
assert trace_dump["crew"] is None
# scalar fields preserved
assert trace_dump["crew_name"] == "TestCrew"
assert trace_dump["inputs"] == {"key": "value"}
def test_crew_event_no_context_preserves_crew(self):
from crewai.events.types.crew_events import CrewKickoffStartedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs=None
)
normal_dump = event.model_dump()
# Without trace context, crew should NOT be None (field serializer didn't fire)
assert normal_dump["crew"] is not None
class TestLLMEventFieldSerializers:
"""Test that LLM event field serializers respond to trace context."""
def test_llm_call_started_excludes_callbacks_in_trace(self):
from crewai.events.types.llm_events import LLMCallStartedEvent
event = LLMCallStartedEvent(
call_id="test-call",
messages=[{"role": "user", "content": "Hello"}],
tools=[{"name": "search", "description": "Search tool"}],
callbacks=[MagicMock(), MagicMock()],
available_functions={"search": MagicMock()},
)
trace_dump = event.model_dump(context={"trace": True})
# callbacks and available_functions excluded
assert trace_dump["callbacks"] is None
assert trace_dump["available_functions"] is None
# tools preserved (lightweight list of dicts)
assert trace_dump["tools"] == [{"name": "search", "description": "Search tool"}]
# messages preserved
assert trace_dump["messages"] == [{"role": "user", "content": "Hello"}]
# ---------------------------------------------------------------------------
# Integration tests: safe_serialize_to_dict with context
# ---------------------------------------------------------------------------
class TestSafeSerializeWithContext:
"""Test that safe_serialize_to_dict properly passes context through."""
def test_context_flows_through_to_field_serializers(self):
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Worker")
task = _make_mock_task(name="Work Task")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="error msg"
)
result = safe_serialize_to_dict(event, context={"trace": True})
# Field serializers should have fired
assert result["agent"] == {"id": str(agent.id), "role": "Worker"}
assert result["task"] == {"id": str(task.id), "name": "Work Task"}
assert result["error"] == "error msg"
def test_no_context_preserves_full_serialization(self):
from crewai.events.types.task_events import TaskFailedEvent
task = _make_mock_task(name="Test")
event = TaskFailedEvent(task=task, error="fail")
result = safe_serialize_to_dict(event)
# Without context, task should not be a lightweight ref
assert result.get("task") is not None
# It should be the raw object (model_dump returns it as-is for Any fields)
# to_serializable will then repr() or process it further
# ---------------------------------------------------------------------------
# Integration tests: TraceCollectionListener._build_event_data
# ---------------------------------------------------------------------------
class TestBuildEventData:
@pytest.fixture
def listener(self):
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_crew_kickoff_started_has_crew_structure(self, listener):
agent = _make_stub_agent(role="Researcher")
agent.tools = [_make_stub_tool("search"), _make_stub_tool("read")]
task = _make_mock_task(name="Research Task", agent=agent)
task.context = None
crew = MagicMock()
crew.agents = [agent]
crew.tasks = [task]
crew.process = "sequential"
crew.verbose = True
crew.memory = False
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
from crewai.events.types.crew_events import CrewKickoffStartedEvent
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs={"key": "value"}
)
result = listener._build_event_data("crew_kickoff_started", event, None)
assert "crew_structure" in result
cs = result["crew_structure"]
assert len(cs["agents"]) == 1
assert cs["agents"][0]["role"] == "Researcher"
assert cs["agents"][0]["tool_names"] == ["search", "read"]
assert len(cs["tasks"]) == 1
assert cs["tasks"][0]["name"] == "Research Task"
assert "agent_ref" in cs["tasks"][0]
assert cs["tasks"][0]["agent_ref"]["role"] == "Researcher"
def test_crew_kickoff_started_context_task_ids(self, listener):
agent = _make_stub_agent()
task1 = _make_mock_task(name="Task 1", agent=agent)
task1.context = None
task2 = _make_mock_task(name="Task 2", agent=agent)
task2.context = [task1]
crew = MagicMock()
crew.agents = [agent]
crew.tasks = [task1, task2]
crew.process = "sequential"
crew.verbose = False
crew.memory = False
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
from crewai.events.types.crew_events import CrewKickoffStartedEvent
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs=None
)
result = listener._build_event_data("crew_kickoff_started", event, None)
task2_data = result["crew_structure"]["tasks"][1]
assert "context_task_ids" in task2_data
assert str(task1.id) in task2_data["context_task_ids"]
def test_generic_event_uses_trace_context(self, listener):
"""Non-complex events should use context-based serialization."""
from crewai.events.types.crew_events import CrewKickoffCompletedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffCompletedEvent(
crew=crew, crew_name="TestCrew", output="Final result", total_tokens=5000
)
result = listener._build_event_data("crew_kickoff_completed", event, None)
# Scalar fields preserved
assert result.get("crew_name") == "TestCrew"
assert result.get("total_tokens") == 5000
# crew excluded by field serializer
assert result.get("crew") is None
# No crew_structure (that's only for kickoff_started)
assert "crew_structure" not in result
def test_task_started_custom_projection(self, listener):
task = _make_mock_task(name="Test Task")
from crewai.events.types.task_events import TaskStartedEvent
event = TaskStartedEvent(task=task, context="test context")
source = MagicMock()
source.agent = _make_stub_agent(role="Worker")
result = listener._build_event_data("task_started", event, source)
assert result["task_name"] == "Test Task"
assert result["agent_role"] == "Worker"
assert result["task_id"] == str(task.id)
assert result["context"] == "test context"
def test_llm_call_started_uses_trace_context(self, listener):
from crewai.events.types.llm_events import LLMCallStartedEvent
event = LLMCallStartedEvent(
call_id="test",
messages=[{"role": "user", "content": "Hello"}],
tools=[{"name": "search"}],
callbacks=[MagicMock()],
available_functions={"fn": MagicMock()},
)
result = listener._build_event_data("llm_call_started", event, None)
# callbacks and available_functions excluded via field serializer
assert result.get("callbacks") is None
assert result.get("available_functions") is None
# tools preserved (lightweight schemas)
assert result.get("tools") == [{"name": "search"}]
def test_agent_execution_error_preserves_identification(self, listener):
"""Error events should preserve agent/task identification via field serializers."""
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Analyst")
task = _make_mock_task(name="Analysis")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="Something broke"
)
result = listener._build_event_data("agent_execution_error", event, None)
# Field serializers return lightweight refs, not None
assert result["agent"] == {"id": str(agent.id), "role": "Analyst"}
assert result["task"] == {"id": str(task.id), "name": "Analysis"}
assert result["error"] == "Something broke"
def test_task_failed_preserves_identification(self, listener):
from crewai.events.types.task_events import TaskFailedEvent
task = _make_mock_task(name="Failed Task")
event = TaskFailedEvent(task=task, error="Task failed")
result = listener._build_event_data("task_failed", event, None)
assert result["task"] == {"id": str(task.id), "name": "Failed Task"}
assert result["error"] == "Task failed"
# ---------------------------------------------------------------------------
# Size reduction verification
# ---------------------------------------------------------------------------
class TestSizeReduction:
@pytest.fixture
def listener(self):
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_task_started_event_size(self, listener):
"""task_started event data should be well under 2KB."""
agent = _make_stub_agent(
role="Researcher",
goal="Research" * 50,
backstory="Expert" * 100,
)
agent.tools = [_make_stub_tool(f"tool_{i}") for i in range(5)]
task = _make_mock_task(
name="Research Task",
description="Detailed description" * 20,
expected_output="Expected" * 10,
agent=agent,
)
task.context = [_make_mock_task() for _ in range(3)]
task.tools = [_make_stub_tool(f"t_{i}") for i in range(3)]
from crewai.events.types.task_events import TaskStartedEvent
event = TaskStartedEvent(task=task, context="test context")
source = MagicMock()
source.agent = agent
result = listener._build_event_data("task_started", event, source)
serialized = json.dumps(result, default=str)
assert len(serialized) < 2000, f"task_started too large: {len(serialized)} bytes"
assert "task_name" in result
assert "agent_role" in result
def test_error_event_size(self, listener):
"""Error events should be small despite having agent/task refs."""
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(
goal="Very long goal " * 100,
backstory="Very long backstory " * 100,
)
task = _make_mock_task(description="Very long description " * 100)
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="error"
)
result = listener._build_event_data("agent_execution_error", event, None)
serialized = json.dumps(result, default=str)
# Should be small - agent/task are just {id, role/name} refs
assert len(serialized) < 5000, f"error event too large: {len(serialized)} bytes"
# ---------------------------------------------------------------------------
# to_serializable context threading
# ---------------------------------------------------------------------------
class TestToSerializableContext:
"""Test that context parameter flows through to_serializable correctly."""
def test_context_passed_to_model_dump(self):
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Tester")
task = _make_mock_task(name="Test Task")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="test error"
)
# Directly use to_serializable with context
result = to_serializable(event, context={"trace": True})
assert isinstance(result, dict)
assert result["agent"] == {"id": str(agent.id), "role": "Tester"}
assert result["task"] == {"id": str(task.id), "name": "Test Task"}
def test_no_context_does_not_trigger_serializers(self):
from crewai.events.types.crew_events import CrewKickoffStartedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffStartedEvent(
crew=crew, crew_name="Test", inputs=None
)
# Without context, crew should NOT be None
result = event.model_dump()
assert result["crew"] is not None