Compare commits

...

12 Commits

Author SHA1 Message Date
Greyson LaLonde
8388169a56 Merge branch 'main' into fix/trace-serialization-pydantic-context 2026-03-31 23:45:34 +08:00
Greyson LaLonde
5de23b867c chore: remove unnecessary comments in _build_event_data 2026-03-31 16:41:57 +08:00
Greyson LaLonde
8edd8b3355 Merge branch 'fix/trace-serialization-pydantic-context' of https://github.com/crewAIInc/crewAI into fix/trace-serialization-pydantic-context 2026-03-31 16:36:00 +08:00
Greyson LaLonde
2af6a531f5 fix: serialize Enum via .value and add trace serializer for tool usage events 2026-03-31 16:34:59 +08:00
Greyson LaLonde
c0d6d2b63f chore: removed additional unused import in test
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-03-31 16:33:41 +08:00
Greyson LaLonde
3e0c750f51 Merge branch 'fix/trace-serialization-pydantic-context' of https://github.com/crewAIInc/crewAI into fix/trace-serialization-pydantic-context 2026-03-31 16:26:43 +08:00
Greyson LaLonde
416f01fe23 fix: add trace field serializer for agent on tool usage events 2026-03-31 16:25:14 +08:00
Greyson LaLonde
da65ca2502 chore: remove unused import in test
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-03-31 16:23:21 +08:00
Greyson LaLonde
47f192e112 fix: handle plain classes and callables in to_serializable
Add __dict__ handler for non-Pydantic classes so their attributes are
serialized rather than falling through to repr(). Guard with a callable
check so functions/lambdas still get repr().
2026-03-31 15:59:43 +08:00
Greyson LaLonde
19d1088bab chore: remove redundant callable check in to_serializable 2026-03-31 15:49:37 +08:00
Greyson LaLonde
1faee0c684 refactor: type trace ref helpers as TypedDict 2026-03-31 15:45:34 +08:00
Greyson LaLonde
6da1c5f964 fix: reduce trace event serialization bloat
Use context-aware field serializers so event models control their own
trace representation. Heavy nested objects become lightweight refs.
2026-03-31 15:40:25 +08:00
10 changed files with 901 additions and 34 deletions

View File

@@ -2,14 +2,56 @@ from collections.abc import Iterator
import contextvars
from datetime import datetime, timezone
import itertools
from typing import Any
from typing import Any, TypedDict
import uuid
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, SerializationInfo
from crewai.utilities.serialization import Serializable, to_serializable
def _is_trace_context(info: SerializationInfo) -> bool:
"""Check if serialization is happening in trace context."""
return bool(info.context and info.context.get("trace"))
class AgentRef(TypedDict):
id: str
role: str
class TaskRef(TypedDict):
id: str
name: str
def _trace_agent_ref(agent: Any) -> AgentRef | None:
"""Return a lightweight agent reference for trace serialization."""
if agent is None:
return None
return AgentRef(
id=str(getattr(agent, "id", "")),
role=getattr(agent, "role", ""),
)
def _trace_task_ref(task: Any) -> TaskRef | None:
"""Return a lightweight task reference for trace serialization."""
if task is None:
return None
return TaskRef(
id=str(getattr(task, "id", "")),
name=str(getattr(task, "name", None) or getattr(task, "description", "")),
)
def _trace_tool_names(tools: Any) -> list[str] | None:
"""Return a list of tool names for trace serialization."""
if not tools:
return None
return [getattr(t, "name", str(t)) for t in tools]
_emission_counter: contextvars.ContextVar[Iterator[int]] = contextvars.ContextVar(
"_emission_counter"
)

View File

@@ -1,7 +1,7 @@
"""Trace collection listener for orchestrating trace collection."""
import os
from typing import Any, ClassVar
from typing import Any
import uuid
from typing_extensions import Self
@@ -126,18 +126,13 @@ from crewai.events.types.tool_usage_events import (
from crewai.events.utils.console_formatter import ConsoleFormatter
_TRACE_CONTEXT: dict[str, bool] = {"trace": True}
"""Serialization context that triggers lightweight field serializers on event models."""
class TraceCollectionListener(BaseEventListener):
"""Trace collection listener that orchestrates trace collection."""
complex_events: ClassVar[list[str]] = [
"task_started",
"task_completed",
"llm_call_started",
"llm_call_completed",
"agent_execution_started",
"agent_execution_completed",
]
_instance: Self | None = None
_initialized: bool = False
_listeners_setup: bool = False
@@ -810,9 +805,19 @@ class TraceCollectionListener(BaseEventListener):
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return safe_serialize_to_dict(event)
"""Build event data with context-based serialization to reduce trace bloat.
Field serializers on event models check for context={"trace": True} and
return lightweight references instead of full nested objects. This replaces
the old denylist approach with Pydantic v2's native context mechanism.
Only crew_kickoff_started gets a full crew structure (built separately).
Complex events (task_started, etc.) use custom projections for specific shapes.
All other events get context-aware serialization automatically.
"""
if event_type == "crew_kickoff_started":
return self._build_crew_started_data(event)
if event_type == "task_started":
task_name = event.task.name or event.task.description
task_display_name = (
@@ -853,19 +858,77 @@ class TraceCollectionListener(BaseEventListener):
"agent_backstory": event.agent.backstory,
}
if event_type == "llm_call_started":
event_data = safe_serialize_to_dict(event)
event_data = safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
event_data["task_name"] = event.task_name or getattr(
event, "task_description", None
)
return event_data
if event_type == "llm_call_completed":
return safe_serialize_to_dict(event)
return safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
return {
"event_type": event_type,
"event": safe_serialize_to_dict(event),
"source": source,
}
return safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
def _build_crew_started_data(self, event: Any) -> dict[str, Any]:
"""Build comprehensive crew structure for crew_kickoff_started event.
This is the ONE place where we serialize the full crew structure.
Subsequent events use lightweight references via field serializers.
"""
event_data = safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
crew = getattr(event, "crew", None)
if crew is not None:
agents_data = []
for agent in getattr(crew, "agents", []) or []:
agent_data = {
"id": str(getattr(agent, "id", "")),
"role": getattr(agent, "role", ""),
"goal": getattr(agent, "goal", ""),
"backstory": getattr(agent, "backstory", ""),
"verbose": getattr(agent, "verbose", False),
"allow_delegation": getattr(agent, "allow_delegation", False),
"max_iter": getattr(agent, "max_iter", None),
"max_rpm": getattr(agent, "max_rpm", None),
}
tools = getattr(agent, "tools", None)
if tools:
agent_data["tool_names"] = [
getattr(t, "name", str(t)) for t in tools
]
agents_data.append(agent_data)
tasks_data = []
for task in getattr(crew, "tasks", []) or []:
task_data = {
"id": str(getattr(task, "id", "")),
"name": getattr(task, "name", None),
"description": getattr(task, "description", ""),
"expected_output": getattr(task, "expected_output", ""),
"async_execution": getattr(task, "async_execution", False),
"human_input": getattr(task, "human_input", False),
}
task_agent = getattr(task, "agent", None)
if task_agent:
task_data["agent_ref"] = {
"id": str(getattr(task_agent, "id", "")),
"role": getattr(task_agent, "role", ""),
}
context_tasks = getattr(task, "context", None)
if context_tasks:
task_data["context_task_ids"] = [
str(getattr(ct, "id", "")) for ct in context_tasks
]
tasks_data.append(task_data)
event_data["crew_structure"] = {
"agents": agents_data,
"tasks": tasks_data,
"process": str(getattr(crew, "process", "")),
"verbose": getattr(crew, "verbose", False),
"memory": getattr(crew, "memory", False),
}
return event_data
def _show_tracing_disabled_message(self) -> None:
"""Show a message when tracing is disabled."""

View File

@@ -429,10 +429,22 @@ def mark_first_execution_done(user_consented: bool = False) -> None:
p.write_text(json.dumps(data, indent=2))
def safe_serialize_to_dict(obj: Any, exclude: set[str] | None = None) -> dict[str, Any]:
"""Safely serialize an object to a dictionary for event data."""
def safe_serialize_to_dict(
obj: Any,
exclude: set[str] | None = None,
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Safely serialize an object to a dictionary for event data.
Args:
obj: Object to serialize.
exclude: Set of keys to exclude from the result.
context: Optional context dict passed through to Pydantic's model_dump().
Field serializers can inspect this to customize output
(e.g. context={"trace": True} for lightweight trace serialization).
"""
try:
serialized = to_serializable(obj, exclude)
serialized = to_serializable(obj, exclude, context=context)
if isinstance(serialized, dict):
return serialized
return {"serialized_data": serialized}

View File

@@ -5,11 +5,17 @@ from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from pydantic import ConfigDict, model_validator
from pydantic import ConfigDict, SerializationInfo, field_serializer, model_validator
from typing_extensions import Self
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.base_events import BaseEvent
from crewai.events.base_events import (
BaseEvent,
_is_trace_context,
_trace_agent_ref,
_trace_task_ref,
_trace_tool_names,
)
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
@@ -31,6 +37,21 @@ class AgentExecutionStartedEvent(BaseEvent):
_set_agent_fingerprint(self, self.agent)
return self
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
@field_serializer("tools")
@classmethod
def _serialize_tools(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_tool_names(v) if _is_trace_context(info) else v
class AgentExecutionCompletedEvent(BaseEvent):
"""Event emitted when an agent completes executing a task"""
@@ -48,6 +69,16 @@ class AgentExecutionCompletedEvent(BaseEvent):
_set_agent_fingerprint(self, self.agent)
return self
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class AgentExecutionErrorEvent(BaseEvent):
"""Event emitted when an agent encounters an error during execution"""
@@ -65,6 +96,16 @@ class AgentExecutionErrorEvent(BaseEvent):
_set_agent_fingerprint(self, self.agent)
return self
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
# New event classes for LiteAgent
class LiteAgentExecutionStartedEvent(BaseEvent):
@@ -77,6 +118,11 @@ class LiteAgentExecutionStartedEvent(BaseEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_serializer("tools")
@classmethod
def _serialize_tools(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_tool_names(v) if _is_trace_context(info) else v
class LiteAgentExecutionCompletedEvent(BaseEvent):
"""Event emitted when a LiteAgent completes execution"""

View File

@@ -1,6 +1,8 @@
from typing import TYPE_CHECKING, Any
from crewai.events.base_events import BaseEvent
from pydantic import SerializationInfo, field_serializer
from crewai.events.base_events import BaseEvent, _is_trace_context
if TYPE_CHECKING:
@@ -26,6 +28,14 @@ class CrewBaseEvent(BaseEvent):
if self.crew.fingerprint.metadata:
self.fingerprint_metadata = self.crew.fingerprint.metadata
@field_serializer("crew")
@classmethod
def _serialize_crew(cls, v: Any, info: SerializationInfo) -> Any:
"""Exclude crew in trace context — crew_kickoff_started builds structure separately."""
if _is_trace_context(info):
return None
return v
def to_json(self, exclude: set[str] | None = None) -> Any:
if exclude is None:
exclude = set()

View File

@@ -1,9 +1,9 @@
from enum import Enum
from typing import Any
from pydantic import BaseModel
from pydantic import BaseModel, SerializationInfo, field_serializer
from crewai.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent, _is_trace_context
class LLMEventBase(BaseEvent):
@@ -49,6 +49,16 @@ class LLMCallStartedEvent(LLMEventBase):
callbacks: list[Any] | None = None
available_functions: dict[str, Any] | None = None
@field_serializer("callbacks")
@classmethod
def _serialize_callbacks(cls, v: Any, info: SerializationInfo) -> Any:
return None if _is_trace_context(info) else v
@field_serializer("available_functions")
@classmethod
def _serialize_available_functions(cls, v: Any, info: SerializationInfo) -> Any:
return None if _is_trace_context(info) else v
class LLMCallCompletedEvent(LLMEventBase):
"""Event emitted when a LLM call completes"""

View File

@@ -1,6 +1,8 @@
from typing import Any
from crewai.events.base_events import BaseEvent
from pydantic import SerializationInfo, field_serializer
from crewai.events.base_events import BaseEvent, _is_trace_context, _trace_task_ref
from crewai.tasks.task_output import TaskOutput
@@ -24,6 +26,11 @@ class TaskStartedEvent(BaseEvent):
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class TaskCompletedEvent(BaseEvent):
"""Event emitted when a task completes"""
@@ -36,6 +43,11 @@ class TaskCompletedEvent(BaseEvent):
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class TaskFailedEvent(BaseEvent):
"""Event emitted when a task fails"""
@@ -48,6 +60,11 @@ class TaskFailedEvent(BaseEvent):
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class TaskEvaluationEvent(BaseEvent):
"""Event emitted when a task evaluation is completed"""
@@ -59,3 +76,8 @@ class TaskEvaluationEvent(BaseEvent):
def __init__(self, **data: Any) -> None:
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v

View File

@@ -2,9 +2,9 @@ from collections.abc import Callable
from datetime import datetime
from typing import Any
from pydantic import ConfigDict
from pydantic import ConfigDict, SerializationInfo, field_serializer
from crewai.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent, _is_trace_context, _trace_agent_ref
class ToolUsageEvent(BaseEvent):
@@ -26,6 +26,11 @@ class ToolUsageEvent(BaseEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
def __init__(self, **data: Any) -> None:
if data.get("from_task"):
task = data["from_task"]
@@ -99,6 +104,11 @@ class ToolExecutionErrorEvent(BaseEvent):
tool_class: Callable[..., Any]
agent: Any | None = None
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
def __init__(self, **data: Any) -> None:
super().__init__(**data)
# Set fingerprint data from the agent

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from datetime import date, datetime
from enum import Enum
import json
from typing import Any, TypeAlias
import uuid
@@ -20,6 +21,7 @@ def to_serializable(
max_depth: int = 5,
_current_depth: int = 0,
_ancestors: set[int] | None = None,
context: dict[str, Any] | None = None,
) -> Serializable:
"""Converts a Python object into a JSON-compatible representation.
@@ -33,6 +35,9 @@ def to_serializable(
max_depth: Maximum recursion depth. Defaults to 5.
_current_depth: Current recursion depth (for internal use).
_ancestors: Set of ancestor object ids for cycle detection (for internal use).
context: Optional context dict passed to Pydantic's model_dump(context=...).
Field serializers on the model can inspect this to customize output
(e.g. context={"trace": True} for lightweight trace serialization).
Returns:
Serializable: A JSON-compatible structure.
@@ -48,6 +53,15 @@ def to_serializable(
if isinstance(obj, (str, int, float, bool, type(None))):
return obj
if isinstance(obj, Enum):
return to_serializable(
obj.value,
exclude=exclude,
max_depth=max_depth,
_current_depth=_current_depth,
_ancestors=_ancestors,
context=context,
)
if isinstance(obj, uuid.UUID):
return str(obj)
if isinstance(obj, (date, datetime)):
@@ -66,6 +80,7 @@ def to_serializable(
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
for item in obj
]
@@ -77,17 +92,24 @@ def to_serializable(
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
for key, value in obj.items()
if key not in exclude
}
if isinstance(obj, BaseModel):
try:
dump_kwargs: dict[str, Any] = {}
if exclude:
dump_kwargs["exclude"] = exclude
if context is not None:
dump_kwargs["context"] = context
return to_serializable(
obj=obj.model_dump(exclude=exclude),
obj=obj.model_dump(**dump_kwargs),
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
except Exception:
try:
@@ -97,12 +119,30 @@ def to_serializable(
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
for k, v in obj.__dict__.items()
if k not in (exclude or set())
}
except Exception:
return repr(obj)
if callable(obj):
return repr(obj)
if hasattr(obj, "__dict__"):
try:
return {
_to_serializable_key(k): to_serializable(
v,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
for k, v in obj.__dict__.items()
if not k.startswith("_")
}
except Exception:
return repr(obj)
return repr(obj)

View File

@@ -0,0 +1,612 @@
"""Tests for trace serialization optimization using Pydantic v2 context-based serialization.
These tests verify that trace events use @field_serializer with SerializationInfo.context
to produce lightweight representations, reducing event sizes from 50-100KB to a few KB.
"""
import json
import uuid
from typing import Any
from unittest.mock import MagicMock
import pytest
from pydantic import ConfigDict
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.base_events import _trace_agent_ref, _trace_task_ref, _trace_tool_names
from crewai.events.listeners.tracing.utils import safe_serialize_to_dict
from crewai.utilities.serialization import to_serializable
# ---------------------------------------------------------------------------
# Lightweight BaseAgent subclass for tests (avoids heavy dependencies)
# ---------------------------------------------------------------------------
class _StubAgent(BaseAgent):
"""Minimal BaseAgent subclass that satisfies validation without heavy deps."""
model_config = ConfigDict(arbitrary_types_allowed=True)
def execute_task(self, *a: Any, **kw: Any) -> str:
return ""
def create_agent_executor(self, *a: Any, **kw: Any) -> None:
pass
def _parse_tools(self, *a: Any, **kw: Any) -> list:
return []
def get_delegation_tools(self, *a: Any, **kw: Any) -> list:
return []
def get_output_converter(self, *a: Any, **kw: Any) -> Any:
return None
def get_multimodal_tools(self, *a: Any, **kw: Any) -> list:
return []
async def aexecute_task(self, *a: Any, **kw: Any) -> str:
return ""
def get_mcp_tools(self, *a: Any, **kw: Any) -> list:
return []
def get_platform_tools(self, *a: Any, **kw: Any) -> list:
return []
def _make_stub_agent(**overrides) -> _StubAgent:
"""Create a minimal BaseAgent instance for testing."""
defaults = {
"role": "Researcher",
"goal": "Research things",
"backstory": "Expert researcher",
"tools": [],
}
defaults.update(overrides)
return _StubAgent(**defaults)
# ---------------------------------------------------------------------------
# Helpers to build realistic mock objects for event fields
# ---------------------------------------------------------------------------
def _make_mock_task(**overrides):
task = MagicMock()
task.id = overrides.get("id", uuid.uuid4())
task.name = overrides.get("name", "Research Task")
task.description = overrides.get("description", "Do research")
task.expected_output = overrides.get("expected_output", "Research results")
task.async_execution = overrides.get("async_execution", False)
task.human_input = overrides.get("human_input", False)
task.agent = overrides.get("agent", _make_stub_agent())
task.context = overrides.get("context", None)
task.crew = MagicMock()
task.tools = overrides.get("tools", [MagicMock(), MagicMock()])
fp = MagicMock()
fp.uuid_str = str(uuid.uuid4())
fp.metadata = {"name": task.name}
task.fingerprint = fp
return task
def _make_stub_tool(tool_name="web_search") -> Any:
"""Create a minimal BaseTool instance for testing."""
from crewai.tools.base_tool import BaseTool
class _StubTool(BaseTool):
name: str = "stub"
description: str = "stub tool"
def _run(self, *a: Any, **kw: Any) -> str:
return ""
return _StubTool(name=tool_name, description=f"{tool_name} tool")
# ---------------------------------------------------------------------------
# Unit tests: trace ref helpers
# ---------------------------------------------------------------------------
class TestTraceRefHelpers:
def test_trace_agent_ref(self):
agent = _make_stub_agent(role="Analyst")
ref = _trace_agent_ref(agent)
assert ref["role"] == "Analyst"
assert "id" in ref
assert len(ref) == 2 # only id and role
def test_trace_agent_ref_none(self):
assert _trace_agent_ref(None) is None
def test_trace_task_ref(self):
task = _make_mock_task(name="Write Report")
ref = _trace_task_ref(task)
assert ref["name"] == "Write Report"
assert "id" in ref
assert len(ref) == 2
def test_trace_task_ref_falls_back_to_description(self):
task = _make_mock_task(name=None, description="Describe the report")
ref = _trace_task_ref(task)
assert ref["name"] == "Describe the report"
def test_trace_task_ref_none(self):
assert _trace_task_ref(None) is None
def test_trace_tool_names(self):
tools = [_make_stub_tool("search"), _make_stub_tool("read")]
names = _trace_tool_names(tools)
assert names == ["search", "read"]
def test_trace_tool_names_empty(self):
assert _trace_tool_names([]) is None
assert _trace_tool_names(None) is None
# ---------------------------------------------------------------------------
# Integration tests: field serializers on real event classes
# ---------------------------------------------------------------------------
class TestAgentEventFieldSerializers:
"""Test that agent event field serializers respond to trace context."""
def test_agent_execution_started_trace_context(self):
from crewai.events.types.agent_events import AgentExecutionStartedEvent
agent = _make_stub_agent(role="Researcher")
task = _make_mock_task(name="Research Task")
tools = [_make_stub_tool("search"), _make_stub_tool("read")]
event = AgentExecutionStartedEvent(
agent=agent, task=task, tools=tools, task_prompt="Do research"
)
# With trace context: lightweight refs
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["agent"] == {"id": str(agent.id), "role": "Researcher"}
assert trace_dump["task"] == {"id": str(task.id), "name": "Research Task"}
assert trace_dump["tools"] == ["search", "read"]
def test_agent_execution_started_no_context(self):
from crewai.events.types.agent_events import AgentExecutionStartedEvent
agent = _make_stub_agent(role="SpecificRole")
task = _make_mock_task()
event = AgentExecutionStartedEvent(
agent=agent, task=task, tools=None, task_prompt="Do research"
)
# Without context: full agent dict (Pydantic model_dump expands it)
normal_dump = event.model_dump()
assert isinstance(normal_dump["agent"], dict)
assert normal_dump["agent"]["role"] == "SpecificRole"
# Should have ALL agent fields, not just the lightweight ref
assert "goal" in normal_dump["agent"]
assert "backstory" in normal_dump["agent"]
assert "max_iter" in normal_dump["agent"]
def test_agent_execution_error_preserves_identification(self):
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Analyst")
task = _make_mock_task(name="Analysis Task")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="Something went wrong"
)
trace_dump = event.model_dump(context={"trace": True})
# Error events should still have agent/task identification as refs
assert trace_dump["agent"]["role"] == "Analyst"
assert trace_dump["task"]["name"] == "Analysis Task"
assert trace_dump["error"] == "Something went wrong"
def test_agent_execution_completed_trace_context(self):
from crewai.events.types.agent_events import AgentExecutionCompletedEvent
agent = _make_stub_agent(role="Writer")
task = _make_mock_task(name="Writing Task")
event = AgentExecutionCompletedEvent(
agent=agent, task=task, output="Final output"
)
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["agent"]["role"] == "Writer"
assert trace_dump["task"]["name"] == "Writing Task"
assert trace_dump["output"] == "Final output"
class TestTaskEventFieldSerializers:
"""Test that task event field serializers respond to trace context."""
def test_task_started_trace_context(self):
from crewai.events.types.task_events import TaskStartedEvent
task = _make_mock_task(name="Test Task")
event = TaskStartedEvent(task=task, context="some context")
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["task"] == {"id": str(task.id), "name": "Test Task"}
assert trace_dump["context"] == "some context"
def test_task_failed_trace_context(self):
from crewai.events.types.task_events import TaskFailedEvent
task = _make_mock_task(name="Failing Task")
event = TaskFailedEvent(task=task, error="Task failed")
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["task"]["name"] == "Failing Task"
assert trace_dump["error"] == "Task failed"
class TestCrewEventFieldSerializers:
"""Test that crew event field serializers respond to trace context."""
def test_crew_kickoff_started_excludes_crew_in_trace(self):
from crewai.events.types.crew_events import CrewKickoffStartedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs={"key": "value"}
)
trace_dump = event.model_dump(context={"trace": True})
# crew field should be None in trace context
assert trace_dump["crew"] is None
# scalar fields preserved
assert trace_dump["crew_name"] == "TestCrew"
assert trace_dump["inputs"] == {"key": "value"}
def test_crew_event_no_context_preserves_crew(self):
from crewai.events.types.crew_events import CrewKickoffStartedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs=None
)
normal_dump = event.model_dump()
# Without trace context, crew should NOT be None (field serializer didn't fire)
assert normal_dump["crew"] is not None
class TestLLMEventFieldSerializers:
"""Test that LLM event field serializers respond to trace context."""
def test_llm_call_started_excludes_callbacks_in_trace(self):
from crewai.events.types.llm_events import LLMCallStartedEvent
event = LLMCallStartedEvent(
call_id="test-call",
messages=[{"role": "user", "content": "Hello"}],
tools=[{"name": "search", "description": "Search tool"}],
callbacks=[MagicMock(), MagicMock()],
available_functions={"search": MagicMock()},
)
trace_dump = event.model_dump(context={"trace": True})
# callbacks and available_functions excluded
assert trace_dump["callbacks"] is None
assert trace_dump["available_functions"] is None
# tools preserved (lightweight list of dicts)
assert trace_dump["tools"] == [{"name": "search", "description": "Search tool"}]
# messages preserved
assert trace_dump["messages"] == [{"role": "user", "content": "Hello"}]
# ---------------------------------------------------------------------------
# Integration tests: safe_serialize_to_dict with context
# ---------------------------------------------------------------------------
class TestSafeSerializeWithContext:
"""Test that safe_serialize_to_dict properly passes context through."""
def test_context_flows_through_to_field_serializers(self):
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Worker")
task = _make_mock_task(name="Work Task")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="error msg"
)
result = safe_serialize_to_dict(event, context={"trace": True})
# Field serializers should have fired
assert result["agent"] == {"id": str(agent.id), "role": "Worker"}
assert result["task"] == {"id": str(task.id), "name": "Work Task"}
assert result["error"] == "error msg"
def test_no_context_preserves_full_serialization(self):
from crewai.events.types.task_events import TaskFailedEvent
task = _make_mock_task(name="Test")
event = TaskFailedEvent(task=task, error="fail")
result = safe_serialize_to_dict(event)
# Without context, task should not be a lightweight ref
assert result.get("task") is not None
# It should be the raw object (model_dump returns it as-is for Any fields)
# to_serializable will then repr() or process it further
# ---------------------------------------------------------------------------
# Integration tests: TraceCollectionListener._build_event_data
# ---------------------------------------------------------------------------
class TestBuildEventData:
@pytest.fixture
def listener(self):
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_crew_kickoff_started_has_crew_structure(self, listener):
agent = _make_stub_agent(role="Researcher")
agent.tools = [_make_stub_tool("search"), _make_stub_tool("read")]
task = _make_mock_task(name="Research Task", agent=agent)
task.context = None
crew = MagicMock()
crew.agents = [agent]
crew.tasks = [task]
crew.process = "sequential"
crew.verbose = True
crew.memory = False
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
from crewai.events.types.crew_events import CrewKickoffStartedEvent
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs={"key": "value"}
)
result = listener._build_event_data("crew_kickoff_started", event, None)
assert "crew_structure" in result
cs = result["crew_structure"]
assert len(cs["agents"]) == 1
assert cs["agents"][0]["role"] == "Researcher"
assert cs["agents"][0]["tool_names"] == ["search", "read"]
assert len(cs["tasks"]) == 1
assert cs["tasks"][0]["name"] == "Research Task"
assert "agent_ref" in cs["tasks"][0]
assert cs["tasks"][0]["agent_ref"]["role"] == "Researcher"
def test_crew_kickoff_started_context_task_ids(self, listener):
agent = _make_stub_agent()
task1 = _make_mock_task(name="Task 1", agent=agent)
task1.context = None
task2 = _make_mock_task(name="Task 2", agent=agent)
task2.context = [task1]
crew = MagicMock()
crew.agents = [agent]
crew.tasks = [task1, task2]
crew.process = "sequential"
crew.verbose = False
crew.memory = False
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
from crewai.events.types.crew_events import CrewKickoffStartedEvent
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs=None
)
result = listener._build_event_data("crew_kickoff_started", event, None)
task2_data = result["crew_structure"]["tasks"][1]
assert "context_task_ids" in task2_data
assert str(task1.id) in task2_data["context_task_ids"]
def test_generic_event_uses_trace_context(self, listener):
"""Non-complex events should use context-based serialization."""
from crewai.events.types.crew_events import CrewKickoffCompletedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffCompletedEvent(
crew=crew, crew_name="TestCrew", output="Final result", total_tokens=5000
)
result = listener._build_event_data("crew_kickoff_completed", event, None)
# Scalar fields preserved
assert result.get("crew_name") == "TestCrew"
assert result.get("total_tokens") == 5000
# crew excluded by field serializer
assert result.get("crew") is None
# No crew_structure (that's only for kickoff_started)
assert "crew_structure" not in result
def test_task_started_custom_projection(self, listener):
task = _make_mock_task(name="Test Task")
from crewai.events.types.task_events import TaskStartedEvent
event = TaskStartedEvent(task=task, context="test context")
source = MagicMock()
source.agent = _make_stub_agent(role="Worker")
result = listener._build_event_data("task_started", event, source)
assert result["task_name"] == "Test Task"
assert result["agent_role"] == "Worker"
assert result["task_id"] == str(task.id)
assert result["context"] == "test context"
def test_llm_call_started_uses_trace_context(self, listener):
from crewai.events.types.llm_events import LLMCallStartedEvent
event = LLMCallStartedEvent(
call_id="test",
messages=[{"role": "user", "content": "Hello"}],
tools=[{"name": "search"}],
callbacks=[MagicMock()],
available_functions={"fn": MagicMock()},
)
result = listener._build_event_data("llm_call_started", event, None)
# callbacks and available_functions excluded via field serializer
assert result.get("callbacks") is None
assert result.get("available_functions") is None
# tools preserved (lightweight schemas)
assert result.get("tools") == [{"name": "search"}]
def test_agent_execution_error_preserves_identification(self, listener):
"""Error events should preserve agent/task identification via field serializers."""
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Analyst")
task = _make_mock_task(name="Analysis")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="Something broke"
)
result = listener._build_event_data("agent_execution_error", event, None)
# Field serializers return lightweight refs, not None
assert result["agent"] == {"id": str(agent.id), "role": "Analyst"}
assert result["task"] == {"id": str(task.id), "name": "Analysis"}
assert result["error"] == "Something broke"
def test_task_failed_preserves_identification(self, listener):
from crewai.events.types.task_events import TaskFailedEvent
task = _make_mock_task(name="Failed Task")
event = TaskFailedEvent(task=task, error="Task failed")
result = listener._build_event_data("task_failed", event, None)
assert result["task"] == {"id": str(task.id), "name": "Failed Task"}
assert result["error"] == "Task failed"
# ---------------------------------------------------------------------------
# Size reduction verification
# ---------------------------------------------------------------------------
class TestSizeReduction:
@pytest.fixture
def listener(self):
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_task_started_event_size(self, listener):
"""task_started event data should be well under 2KB."""
agent = _make_stub_agent(
role="Researcher",
goal="Research" * 50,
backstory="Expert" * 100,
)
agent.tools = [_make_stub_tool(f"tool_{i}") for i in range(5)]
task = _make_mock_task(
name="Research Task",
description="Detailed description" * 20,
expected_output="Expected" * 10,
agent=agent,
)
task.context = [_make_mock_task() for _ in range(3)]
task.tools = [_make_stub_tool(f"t_{i}") for i in range(3)]
from crewai.events.types.task_events import TaskStartedEvent
event = TaskStartedEvent(task=task, context="test context")
source = MagicMock()
source.agent = agent
result = listener._build_event_data("task_started", event, source)
serialized = json.dumps(result, default=str)
assert len(serialized) < 2000, f"task_started too large: {len(serialized)} bytes"
assert "task_name" in result
assert "agent_role" in result
def test_error_event_size(self, listener):
"""Error events should be small despite having agent/task refs."""
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(
goal="Very long goal " * 100,
backstory="Very long backstory " * 100,
)
task = _make_mock_task(description="Very long description " * 100)
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="error"
)
result = listener._build_event_data("agent_execution_error", event, None)
serialized = json.dumps(result, default=str)
# Should be small - agent/task are just {id, role/name} refs
assert len(serialized) < 5000, f"error event too large: {len(serialized)} bytes"
# ---------------------------------------------------------------------------
# to_serializable context threading
# ---------------------------------------------------------------------------
class TestToSerializableContext:
"""Test that context parameter flows through to_serializable correctly."""
def test_context_passed_to_model_dump(self):
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Tester")
task = _make_mock_task(name="Test Task")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="test error"
)
# Directly use to_serializable with context
result = to_serializable(event, context={"trace": True})
assert isinstance(result, dict)
assert result["agent"] == {"id": str(agent.id), "role": "Tester"}
assert result["task"] == {"id": str(task.id), "name": "Test Task"}
def test_no_context_does_not_trigger_serializers(self):
from crewai.events.types.crew_events import CrewKickoffStartedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffStartedEvent(
crew=crew, crew_name="Test", inputs=None
)
# Without context, crew should NOT be None
result = event.model_dump()
assert result["crew"] is not None