From 6da1c5f9642eb6be11090b6865fbb4b9f80d5eb3 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Tue, 31 Mar 2026 15:40:25 +0800 Subject: [PATCH] fix: reduce trace event serialization bloat Use context-aware field serializers so event models control their own trace representation. Heavy nested objects become lightweight refs. --- lib/crewai/src/crewai/events/base_events.py | 34 +- .../listeners/tracing/trace_listener.py | 105 ++- .../crewai/events/listeners/tracing/utils.py | 18 +- .../src/crewai/events/types/agent_events.py | 50 +- .../src/crewai/events/types/crew_events.py | 12 +- .../src/crewai/events/types/llm_events.py | 14 +- .../src/crewai/events/types/task_events.py | 24 +- .../src/crewai/utilities/serialization.py | 17 +- .../tests/tracing/test_trace_serialization.py | 613 ++++++++++++++++++ 9 files changed, 856 insertions(+), 31 deletions(-) create mode 100644 lib/crewai/tests/tracing/test_trace_serialization.py diff --git a/lib/crewai/src/crewai/events/base_events.py b/lib/crewai/src/crewai/events/base_events.py index 6eeaa06e8..e72ff09b3 100644 --- a/lib/crewai/src/crewai/events/base_events.py +++ b/lib/crewai/src/crewai/events/base_events.py @@ -5,11 +5,43 @@ import itertools from typing import Any import uuid -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, SerializationInfo from crewai.utilities.serialization import Serializable, to_serializable +def _is_trace_context(info: SerializationInfo) -> bool: + """Check if serialization is happening in trace context.""" + return bool(info.context and info.context.get("trace")) + + +def _trace_agent_ref(agent: Any) -> dict[str, Any] | None: + """Return a lightweight agent reference for trace serialization.""" + if agent is None: + return None + return { + "id": str(getattr(agent, "id", "")), + "role": getattr(agent, "role", ""), + } + + +def _trace_task_ref(task: Any) -> dict[str, Any] | None: + """Return a lightweight task reference for trace serialization.""" + if task is None: + return None + return { + "id": str(getattr(task, "id", "")), + "name": getattr(task, "name", None) or getattr(task, "description", ""), + } + + +def _trace_tool_names(tools: Any) -> list[str] | None: + """Return a list of tool names for trace serialization.""" + if not tools: + return None + return [getattr(t, "name", str(t)) for t in tools] + + _emission_counter: contextvars.ContextVar[Iterator[int]] = contextvars.ContextVar( "_emission_counter" ) diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index 9d81f1d55..eb056dac3 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -1,7 +1,7 @@ """Trace collection listener for orchestrating trace collection.""" import os -from typing import Any, ClassVar +from typing import Any import uuid from typing_extensions import Self @@ -126,18 +126,13 @@ from crewai.events.types.tool_usage_events import ( from crewai.events.utils.console_formatter import ConsoleFormatter +_TRACE_CONTEXT: dict[str, bool] = {"trace": True} +"""Serialization context that triggers lightweight field serializers on event models.""" + + class TraceCollectionListener(BaseEventListener): """Trace collection listener that orchestrates trace collection.""" - complex_events: ClassVar[list[str]] = [ - "task_started", - "task_completed", - "llm_call_started", - "llm_call_completed", - "agent_execution_started", - "agent_execution_completed", - ] - _instance: Self | None = None _initialized: bool = False _listeners_setup: bool = False @@ -810,9 +805,20 @@ class TraceCollectionListener(BaseEventListener): def _build_event_data( self, event_type: str, event: Any, source: Any ) -> dict[str, Any]: - """Build event data""" - if event_type not in self.complex_events: - return safe_serialize_to_dict(event) + """Build event data with context-based serialization to reduce trace bloat. + + Field serializers on event models check for context={"trace": True} and + return lightweight references instead of full nested objects. This replaces + the old denylist approach with Pydantic v2's native context mechanism. + + Only crew_kickoff_started gets a full crew structure (built separately). + Complex events (task_started, etc.) use custom projections for specific shapes. + All other events get context-aware serialization automatically. + """ + if event_type == "crew_kickoff_started": + return self._build_crew_started_data(event) + + # Complex events with custom projections (specific field shapes for AMP) if event_type == "task_started": task_name = event.task.name or event.task.description task_display_name = ( @@ -853,19 +859,78 @@ class TraceCollectionListener(BaseEventListener): "agent_backstory": event.agent.backstory, } if event_type == "llm_call_started": - event_data = safe_serialize_to_dict(event) + event_data = safe_serialize_to_dict(event, context=_TRACE_CONTEXT) event_data["task_name"] = event.task_name or getattr( event, "task_description", None ) return event_data if event_type == "llm_call_completed": - return safe_serialize_to_dict(event) + return safe_serialize_to_dict(event, context=_TRACE_CONTEXT) - return { - "event_type": event_type, - "event": safe_serialize_to_dict(event), - "source": source, - } + # All other events: field serializers handle the heavy lifting + return safe_serialize_to_dict(event, context=_TRACE_CONTEXT) + + def _build_crew_started_data(self, event: Any) -> dict[str, Any]: + """Build comprehensive crew structure for crew_kickoff_started event. + + This is the ONE place where we serialize the full crew structure. + Subsequent events use lightweight references via field serializers. + """ + event_data = safe_serialize_to_dict(event, context=_TRACE_CONTEXT) + + crew = getattr(event, "crew", None) + if crew is not None: + agents_data = [] + for agent in getattr(crew, "agents", []) or []: + agent_data = { + "id": str(getattr(agent, "id", "")), + "role": getattr(agent, "role", ""), + "goal": getattr(agent, "goal", ""), + "backstory": getattr(agent, "backstory", ""), + "verbose": getattr(agent, "verbose", False), + "allow_delegation": getattr(agent, "allow_delegation", False), + "max_iter": getattr(agent, "max_iter", None), + "max_rpm": getattr(agent, "max_rpm", None), + } + tools = getattr(agent, "tools", None) + if tools: + agent_data["tool_names"] = [ + getattr(t, "name", str(t)) for t in tools + ] + agents_data.append(agent_data) + + tasks_data = [] + for task in getattr(crew, "tasks", []) or []: + task_data = { + "id": str(getattr(task, "id", "")), + "name": getattr(task, "name", None), + "description": getattr(task, "description", ""), + "expected_output": getattr(task, "expected_output", ""), + "async_execution": getattr(task, "async_execution", False), + "human_input": getattr(task, "human_input", False), + } + task_agent = getattr(task, "agent", None) + if task_agent: + task_data["agent_ref"] = { + "id": str(getattr(task_agent, "id", "")), + "role": getattr(task_agent, "role", ""), + } + context_tasks = getattr(task, "context", None) + if context_tasks: + task_data["context_task_ids"] = [ + str(getattr(ct, "id", "")) for ct in context_tasks + ] + tasks_data.append(task_data) + + event_data["crew_structure"] = { + "agents": agents_data, + "tasks": tasks_data, + "process": str(getattr(crew, "process", "")), + "verbose": getattr(crew, "verbose", False), + "memory": getattr(crew, "memory", False), + } + + return event_data def _show_tracing_disabled_message(self) -> None: """Show a message when tracing is disabled.""" diff --git a/lib/crewai/src/crewai/events/listeners/tracing/utils.py b/lib/crewai/src/crewai/events/listeners/tracing/utils.py index 7a6eff3f0..21a700601 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/utils.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/utils.py @@ -429,10 +429,22 @@ def mark_first_execution_done(user_consented: bool = False) -> None: p.write_text(json.dumps(data, indent=2)) -def safe_serialize_to_dict(obj: Any, exclude: set[str] | None = None) -> dict[str, Any]: - """Safely serialize an object to a dictionary for event data.""" +def safe_serialize_to_dict( + obj: Any, + exclude: set[str] | None = None, + context: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Safely serialize an object to a dictionary for event data. + + Args: + obj: Object to serialize. + exclude: Set of keys to exclude from the result. + context: Optional context dict passed through to Pydantic's model_dump(). + Field serializers can inspect this to customize output + (e.g. context={"trace": True} for lightweight trace serialization). + """ try: - serialized = to_serializable(obj, exclude) + serialized = to_serializable(obj, exclude, context=context) if isinstance(serialized, dict): return serialized return {"serialized_data": serialized} diff --git a/lib/crewai/src/crewai/events/types/agent_events.py b/lib/crewai/src/crewai/events/types/agent_events.py index 49e24e059..97b655b3b 100644 --- a/lib/crewai/src/crewai/events/types/agent_events.py +++ b/lib/crewai/src/crewai/events/types/agent_events.py @@ -5,11 +5,17 @@ from __future__ import annotations from collections.abc import Sequence from typing import Any -from pydantic import ConfigDict, model_validator +from pydantic import ConfigDict, SerializationInfo, field_serializer, model_validator from typing_extensions import Self from crewai.agents.agent_builder.base_agent import BaseAgent -from crewai.events.base_events import BaseEvent +from crewai.events.base_events import ( + BaseEvent, + _is_trace_context, + _trace_agent_ref, + _trace_task_ref, + _trace_tool_names, +) from crewai.tools.base_tool import BaseTool from crewai.tools.structured_tool import CrewStructuredTool @@ -31,6 +37,21 @@ class AgentExecutionStartedEvent(BaseEvent): _set_agent_fingerprint(self, self.agent) return self + @field_serializer("agent") + @classmethod + def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_agent_ref(v) if _is_trace_context(info) else v + + @field_serializer("task") + @classmethod + def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_task_ref(v) if _is_trace_context(info) else v + + @field_serializer("tools") + @classmethod + def _serialize_tools(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_tool_names(v) if _is_trace_context(info) else v + class AgentExecutionCompletedEvent(BaseEvent): """Event emitted when an agent completes executing a task""" @@ -48,6 +69,16 @@ class AgentExecutionCompletedEvent(BaseEvent): _set_agent_fingerprint(self, self.agent) return self + @field_serializer("agent") + @classmethod + def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_agent_ref(v) if _is_trace_context(info) else v + + @field_serializer("task") + @classmethod + def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_task_ref(v) if _is_trace_context(info) else v + class AgentExecutionErrorEvent(BaseEvent): """Event emitted when an agent encounters an error during execution""" @@ -65,6 +96,16 @@ class AgentExecutionErrorEvent(BaseEvent): _set_agent_fingerprint(self, self.agent) return self + @field_serializer("agent") + @classmethod + def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_agent_ref(v) if _is_trace_context(info) else v + + @field_serializer("task") + @classmethod + def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_task_ref(v) if _is_trace_context(info) else v + # New event classes for LiteAgent class LiteAgentExecutionStartedEvent(BaseEvent): @@ -77,6 +118,11 @@ class LiteAgentExecutionStartedEvent(BaseEvent): model_config = ConfigDict(arbitrary_types_allowed=True) + @field_serializer("tools") + @classmethod + def _serialize_tools(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_tool_names(v) if _is_trace_context(info) else v + class LiteAgentExecutionCompletedEvent(BaseEvent): """Event emitted when a LiteAgent completes execution""" diff --git a/lib/crewai/src/crewai/events/types/crew_events.py b/lib/crewai/src/crewai/events/types/crew_events.py index fa198f5ae..8b712e7a3 100644 --- a/lib/crewai/src/crewai/events/types/crew_events.py +++ b/lib/crewai/src/crewai/events/types/crew_events.py @@ -1,6 +1,8 @@ from typing import TYPE_CHECKING, Any -from crewai.events.base_events import BaseEvent +from pydantic import SerializationInfo, field_serializer + +from crewai.events.base_events import BaseEvent, _is_trace_context if TYPE_CHECKING: @@ -26,6 +28,14 @@ class CrewBaseEvent(BaseEvent): if self.crew.fingerprint.metadata: self.fingerprint_metadata = self.crew.fingerprint.metadata + @field_serializer("crew") + @classmethod + def _serialize_crew(cls, v: Any, info: SerializationInfo) -> Any: + """Exclude crew in trace context — crew_kickoff_started builds structure separately.""" + if _is_trace_context(info): + return None + return v + def to_json(self, exclude: set[str] | None = None) -> Any: if exclude is None: exclude = set() diff --git a/lib/crewai/src/crewai/events/types/llm_events.py b/lib/crewai/src/crewai/events/types/llm_events.py index 73d743804..69d33999f 100644 --- a/lib/crewai/src/crewai/events/types/llm_events.py +++ b/lib/crewai/src/crewai/events/types/llm_events.py @@ -1,9 +1,9 @@ from enum import Enum from typing import Any -from pydantic import BaseModel +from pydantic import BaseModel, SerializationInfo, field_serializer -from crewai.events.base_events import BaseEvent +from crewai.events.base_events import BaseEvent, _is_trace_context class LLMEventBase(BaseEvent): @@ -49,6 +49,16 @@ class LLMCallStartedEvent(LLMEventBase): callbacks: list[Any] | None = None available_functions: dict[str, Any] | None = None + @field_serializer("callbacks") + @classmethod + def _serialize_callbacks(cls, v: Any, info: SerializationInfo) -> Any: + return None if _is_trace_context(info) else v + + @field_serializer("available_functions") + @classmethod + def _serialize_available_functions(cls, v: Any, info: SerializationInfo) -> Any: + return None if _is_trace_context(info) else v + class LLMCallCompletedEvent(LLMEventBase): """Event emitted when a LLM call completes""" diff --git a/lib/crewai/src/crewai/events/types/task_events.py b/lib/crewai/src/crewai/events/types/task_events.py index 5d2fd746a..b6fce2e61 100644 --- a/lib/crewai/src/crewai/events/types/task_events.py +++ b/lib/crewai/src/crewai/events/types/task_events.py @@ -1,6 +1,8 @@ from typing import Any -from crewai.events.base_events import BaseEvent +from pydantic import SerializationInfo, field_serializer + +from crewai.events.base_events import BaseEvent, _is_trace_context, _trace_task_ref from crewai.tasks.task_output import TaskOutput @@ -24,6 +26,11 @@ class TaskStartedEvent(BaseEvent): super().__init__(**data) _set_task_fingerprint(self, self.task) + @field_serializer("task") + @classmethod + def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_task_ref(v) if _is_trace_context(info) else v + class TaskCompletedEvent(BaseEvent): """Event emitted when a task completes""" @@ -36,6 +43,11 @@ class TaskCompletedEvent(BaseEvent): super().__init__(**data) _set_task_fingerprint(self, self.task) + @field_serializer("task") + @classmethod + def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_task_ref(v) if _is_trace_context(info) else v + class TaskFailedEvent(BaseEvent): """Event emitted when a task fails""" @@ -48,6 +60,11 @@ class TaskFailedEvent(BaseEvent): super().__init__(**data) _set_task_fingerprint(self, self.task) + @field_serializer("task") + @classmethod + def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_task_ref(v) if _is_trace_context(info) else v + class TaskEvaluationEvent(BaseEvent): """Event emitted when a task evaluation is completed""" @@ -59,3 +76,8 @@ class TaskEvaluationEvent(BaseEvent): def __init__(self, **data: Any) -> None: super().__init__(**data) _set_task_fingerprint(self, self.task) + + @field_serializer("task") + @classmethod + def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any: + return _trace_task_ref(v) if _is_trace_context(info) else v diff --git a/lib/crewai/src/crewai/utilities/serialization.py b/lib/crewai/src/crewai/utilities/serialization.py index 0207e80ab..b93559f7e 100644 --- a/lib/crewai/src/crewai/utilities/serialization.py +++ b/lib/crewai/src/crewai/utilities/serialization.py @@ -20,6 +20,7 @@ def to_serializable( max_depth: int = 5, _current_depth: int = 0, _ancestors: set[int] | None = None, + context: dict[str, Any] | None = None, ) -> Serializable: """Converts a Python object into a JSON-compatible representation. @@ -33,6 +34,9 @@ def to_serializable( max_depth: Maximum recursion depth. Defaults to 5. _current_depth: Current recursion depth (for internal use). _ancestors: Set of ancestor object ids for cycle detection (for internal use). + context: Optional context dict passed to Pydantic's model_dump(context=...). + Field serializers on the model can inspect this to customize output + (e.g. context={"trace": True} for lightweight trace serialization). Returns: Serializable: A JSON-compatible structure. @@ -66,6 +70,7 @@ def to_serializable( max_depth=max_depth, _current_depth=_current_depth + 1, _ancestors=new_ancestors, + context=context, ) for item in obj ] @@ -77,17 +82,24 @@ def to_serializable( max_depth=max_depth, _current_depth=_current_depth + 1, _ancestors=new_ancestors, + context=context, ) for key, value in obj.items() if key not in exclude } if isinstance(obj, BaseModel): try: + dump_kwargs: dict[str, Any] = {} + if exclude: + dump_kwargs["exclude"] = exclude + if context is not None: + dump_kwargs["context"] = context return to_serializable( - obj=obj.model_dump(exclude=exclude), + obj=obj.model_dump(**dump_kwargs), max_depth=max_depth, _current_depth=_current_depth + 1, _ancestors=new_ancestors, + context=context, ) except Exception: try: @@ -97,12 +109,15 @@ def to_serializable( max_depth=max_depth, _current_depth=_current_depth + 1, _ancestors=new_ancestors, + context=context, ) for k, v in obj.__dict__.items() if k not in (exclude or set()) } except Exception: return repr(obj) + if callable(obj): + return repr(obj) return repr(obj) diff --git a/lib/crewai/tests/tracing/test_trace_serialization.py b/lib/crewai/tests/tracing/test_trace_serialization.py new file mode 100644 index 000000000..3413fb13f --- /dev/null +++ b/lib/crewai/tests/tracing/test_trace_serialization.py @@ -0,0 +1,613 @@ +"""Tests for trace serialization optimization using Pydantic v2 context-based serialization. + +These tests verify that trace events use @field_serializer with SerializationInfo.context +to produce lightweight representations, reducing event sizes from 50-100KB to a few KB. +""" + +import json +import uuid +from typing import Any +from unittest.mock import MagicMock + +import pytest +from pydantic import ConfigDict + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.events.base_events import _trace_agent_ref, _trace_task_ref, _trace_tool_names +from crewai.events.listeners.tracing.utils import safe_serialize_to_dict +from crewai.security import Fingerprint, SecurityConfig +from crewai.utilities.serialization import to_serializable + + +# --------------------------------------------------------------------------- +# Lightweight BaseAgent subclass for tests (avoids heavy dependencies) +# --------------------------------------------------------------------------- + +class _StubAgent(BaseAgent): + """Minimal BaseAgent subclass that satisfies validation without heavy deps.""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + def execute_task(self, *a: Any, **kw: Any) -> str: + return "" + + def create_agent_executor(self, *a: Any, **kw: Any) -> None: + pass + + def _parse_tools(self, *a: Any, **kw: Any) -> list: + return [] + + def get_delegation_tools(self, *a: Any, **kw: Any) -> list: + return [] + + def get_output_converter(self, *a: Any, **kw: Any) -> Any: + return None + + def get_multimodal_tools(self, *a: Any, **kw: Any) -> list: + return [] + + async def aexecute_task(self, *a: Any, **kw: Any) -> str: + return "" + + def get_mcp_tools(self, *a: Any, **kw: Any) -> list: + return [] + + def get_platform_tools(self, *a: Any, **kw: Any) -> list: + return [] + + +def _make_stub_agent(**overrides) -> _StubAgent: + """Create a minimal BaseAgent instance for testing.""" + defaults = { + "role": "Researcher", + "goal": "Research things", + "backstory": "Expert researcher", + "tools": [], + } + defaults.update(overrides) + return _StubAgent(**defaults) + + +# --------------------------------------------------------------------------- +# Helpers to build realistic mock objects for event fields +# --------------------------------------------------------------------------- + +def _make_mock_task(**overrides): + task = MagicMock() + task.id = overrides.get("id", uuid.uuid4()) + task.name = overrides.get("name", "Research Task") + task.description = overrides.get("description", "Do research") + task.expected_output = overrides.get("expected_output", "Research results") + task.async_execution = overrides.get("async_execution", False) + task.human_input = overrides.get("human_input", False) + task.agent = overrides.get("agent", _make_stub_agent()) + task.context = overrides.get("context", None) + task.crew = MagicMock() + task.tools = overrides.get("tools", [MagicMock(), MagicMock()]) + + fp = MagicMock() + fp.uuid_str = str(uuid.uuid4()) + fp.metadata = {"name": task.name} + task.fingerprint = fp + + return task + + +def _make_stub_tool(tool_name="web_search") -> Any: + """Create a minimal BaseTool instance for testing.""" + from crewai.tools.base_tool import BaseTool + + class _StubTool(BaseTool): + name: str = "stub" + description: str = "stub tool" + + def _run(self, *a: Any, **kw: Any) -> str: + return "" + + return _StubTool(name=tool_name, description=f"{tool_name} tool") + + +# --------------------------------------------------------------------------- +# Unit tests: trace ref helpers +# --------------------------------------------------------------------------- + +class TestTraceRefHelpers: + def test_trace_agent_ref(self): + agent = _make_stub_agent(role="Analyst") + ref = _trace_agent_ref(agent) + assert ref["role"] == "Analyst" + assert "id" in ref + assert len(ref) == 2 # only id and role + + def test_trace_agent_ref_none(self): + assert _trace_agent_ref(None) is None + + def test_trace_task_ref(self): + task = _make_mock_task(name="Write Report") + ref = _trace_task_ref(task) + assert ref["name"] == "Write Report" + assert "id" in ref + assert len(ref) == 2 + + def test_trace_task_ref_falls_back_to_description(self): + task = _make_mock_task(name=None, description="Describe the report") + ref = _trace_task_ref(task) + assert ref["name"] == "Describe the report" + + def test_trace_task_ref_none(self): + assert _trace_task_ref(None) is None + + def test_trace_tool_names(self): + tools = [_make_stub_tool("search"), _make_stub_tool("read")] + names = _trace_tool_names(tools) + assert names == ["search", "read"] + + def test_trace_tool_names_empty(self): + assert _trace_tool_names([]) is None + assert _trace_tool_names(None) is None + + +# --------------------------------------------------------------------------- +# Integration tests: field serializers on real event classes +# --------------------------------------------------------------------------- + +class TestAgentEventFieldSerializers: + """Test that agent event field serializers respond to trace context.""" + + def test_agent_execution_started_trace_context(self): + from crewai.events.types.agent_events import AgentExecutionStartedEvent + + agent = _make_stub_agent(role="Researcher") + task = _make_mock_task(name="Research Task") + tools = [_make_stub_tool("search"), _make_stub_tool("read")] + + event = AgentExecutionStartedEvent( + agent=agent, task=task, tools=tools, task_prompt="Do research" + ) + + # With trace context: lightweight refs + trace_dump = event.model_dump(context={"trace": True}) + assert trace_dump["agent"] == {"id": str(agent.id), "role": "Researcher"} + assert trace_dump["task"] == {"id": str(task.id), "name": "Research Task"} + assert trace_dump["tools"] == ["search", "read"] + + def test_agent_execution_started_no_context(self): + from crewai.events.types.agent_events import AgentExecutionStartedEvent + + agent = _make_stub_agent(role="SpecificRole") + task = _make_mock_task() + + event = AgentExecutionStartedEvent( + agent=agent, task=task, tools=None, task_prompt="Do research" + ) + + # Without context: full agent dict (Pydantic model_dump expands it) + normal_dump = event.model_dump() + assert isinstance(normal_dump["agent"], dict) + assert normal_dump["agent"]["role"] == "SpecificRole" + # Should have ALL agent fields, not just the lightweight ref + assert "goal" in normal_dump["agent"] + assert "backstory" in normal_dump["agent"] + assert "max_iter" in normal_dump["agent"] + + def test_agent_execution_error_preserves_identification(self): + from crewai.events.types.agent_events import AgentExecutionErrorEvent + + agent = _make_stub_agent(role="Analyst") + task = _make_mock_task(name="Analysis Task") + + event = AgentExecutionErrorEvent( + agent=agent, task=task, error="Something went wrong" + ) + + trace_dump = event.model_dump(context={"trace": True}) + # Error events should still have agent/task identification as refs + assert trace_dump["agent"]["role"] == "Analyst" + assert trace_dump["task"]["name"] == "Analysis Task" + assert trace_dump["error"] == "Something went wrong" + + def test_agent_execution_completed_trace_context(self): + from crewai.events.types.agent_events import AgentExecutionCompletedEvent + + agent = _make_stub_agent(role="Writer") + task = _make_mock_task(name="Writing Task") + + event = AgentExecutionCompletedEvent( + agent=agent, task=task, output="Final output" + ) + + trace_dump = event.model_dump(context={"trace": True}) + assert trace_dump["agent"]["role"] == "Writer" + assert trace_dump["task"]["name"] == "Writing Task" + assert trace_dump["output"] == "Final output" + + +class TestTaskEventFieldSerializers: + """Test that task event field serializers respond to trace context.""" + + def test_task_started_trace_context(self): + from crewai.events.types.task_events import TaskStartedEvent + + task = _make_mock_task(name="Test Task") + event = TaskStartedEvent(task=task, context="some context") + + trace_dump = event.model_dump(context={"trace": True}) + assert trace_dump["task"] == {"id": str(task.id), "name": "Test Task"} + assert trace_dump["context"] == "some context" + + def test_task_failed_trace_context(self): + from crewai.events.types.task_events import TaskFailedEvent + + task = _make_mock_task(name="Failing Task") + event = TaskFailedEvent(task=task, error="Task failed") + + trace_dump = event.model_dump(context={"trace": True}) + assert trace_dump["task"]["name"] == "Failing Task" + assert trace_dump["error"] == "Task failed" + + +class TestCrewEventFieldSerializers: + """Test that crew event field serializers respond to trace context.""" + + def test_crew_kickoff_started_excludes_crew_in_trace(self): + from crewai.events.types.crew_events import CrewKickoffStartedEvent + + crew = MagicMock() + crew.fingerprint = MagicMock() + crew.fingerprint.uuid_str = str(uuid.uuid4()) + crew.fingerprint.metadata = {} + + event = CrewKickoffStartedEvent( + crew=crew, crew_name="TestCrew", inputs={"key": "value"} + ) + + trace_dump = event.model_dump(context={"trace": True}) + # crew field should be None in trace context + assert trace_dump["crew"] is None + # scalar fields preserved + assert trace_dump["crew_name"] == "TestCrew" + assert trace_dump["inputs"] == {"key": "value"} + + def test_crew_event_no_context_preserves_crew(self): + from crewai.events.types.crew_events import CrewKickoffStartedEvent + + crew = MagicMock() + crew.fingerprint = MagicMock() + crew.fingerprint.uuid_str = str(uuid.uuid4()) + crew.fingerprint.metadata = {} + + event = CrewKickoffStartedEvent( + crew=crew, crew_name="TestCrew", inputs=None + ) + + normal_dump = event.model_dump() + # Without trace context, crew should NOT be None (field serializer didn't fire) + assert normal_dump["crew"] is not None + + +class TestLLMEventFieldSerializers: + """Test that LLM event field serializers respond to trace context.""" + + def test_llm_call_started_excludes_callbacks_in_trace(self): + from crewai.events.types.llm_events import LLMCallStartedEvent + + event = LLMCallStartedEvent( + call_id="test-call", + messages=[{"role": "user", "content": "Hello"}], + tools=[{"name": "search", "description": "Search tool"}], + callbacks=[MagicMock(), MagicMock()], + available_functions={"search": MagicMock()}, + ) + + trace_dump = event.model_dump(context={"trace": True}) + # callbacks and available_functions excluded + assert trace_dump["callbacks"] is None + assert trace_dump["available_functions"] is None + # tools preserved (lightweight list of dicts) + assert trace_dump["tools"] == [{"name": "search", "description": "Search tool"}] + # messages preserved + assert trace_dump["messages"] == [{"role": "user", "content": "Hello"}] + + +# --------------------------------------------------------------------------- +# Integration tests: safe_serialize_to_dict with context +# --------------------------------------------------------------------------- + +class TestSafeSerializeWithContext: + """Test that safe_serialize_to_dict properly passes context through.""" + + def test_context_flows_through_to_field_serializers(self): + from crewai.events.types.agent_events import AgentExecutionErrorEvent + + agent = _make_stub_agent(role="Worker") + task = _make_mock_task(name="Work Task") + + event = AgentExecutionErrorEvent( + agent=agent, task=task, error="error msg" + ) + + result = safe_serialize_to_dict(event, context={"trace": True}) + # Field serializers should have fired + assert result["agent"] == {"id": str(agent.id), "role": "Worker"} + assert result["task"] == {"id": str(task.id), "name": "Work Task"} + assert result["error"] == "error msg" + + def test_no_context_preserves_full_serialization(self): + from crewai.events.types.task_events import TaskFailedEvent + + task = _make_mock_task(name="Test") + event = TaskFailedEvent(task=task, error="fail") + + result = safe_serialize_to_dict(event) + # Without context, task should not be a lightweight ref + assert result.get("task") is not None + # It should be the raw object (model_dump returns it as-is for Any fields) + # to_serializable will then repr() or process it further + + +# --------------------------------------------------------------------------- +# Integration tests: TraceCollectionListener._build_event_data +# --------------------------------------------------------------------------- + +class TestBuildEventData: + @pytest.fixture + def listener(self): + from crewai.events.listeners.tracing.trace_listener import ( + TraceCollectionListener, + ) + TraceCollectionListener._instance = None + TraceCollectionListener._initialized = False + TraceCollectionListener._listeners_setup = False + return TraceCollectionListener() + + def test_crew_kickoff_started_has_crew_structure(self, listener): + agent = _make_stub_agent(role="Researcher") + agent.tools = [_make_stub_tool("search"), _make_stub_tool("read")] + + task = _make_mock_task(name="Research Task", agent=agent) + task.context = None + + crew = MagicMock() + crew.agents = [agent] + crew.tasks = [task] + crew.process = "sequential" + crew.verbose = True + crew.memory = False + crew.fingerprint = MagicMock() + crew.fingerprint.uuid_str = str(uuid.uuid4()) + crew.fingerprint.metadata = {} + + from crewai.events.types.crew_events import CrewKickoffStartedEvent + event = CrewKickoffStartedEvent( + crew=crew, crew_name="TestCrew", inputs={"key": "value"} + ) + + result = listener._build_event_data("crew_kickoff_started", event, None) + + assert "crew_structure" in result + cs = result["crew_structure"] + assert len(cs["agents"]) == 1 + assert cs["agents"][0]["role"] == "Researcher" + assert cs["agents"][0]["tool_names"] == ["search", "read"] + assert len(cs["tasks"]) == 1 + assert cs["tasks"][0]["name"] == "Research Task" + assert "agent_ref" in cs["tasks"][0] + assert cs["tasks"][0]["agent_ref"]["role"] == "Researcher" + + def test_crew_kickoff_started_context_task_ids(self, listener): + agent = _make_stub_agent() + task1 = _make_mock_task(name="Task 1", agent=agent) + task1.context = None + task2 = _make_mock_task(name="Task 2", agent=agent) + task2.context = [task1] + + crew = MagicMock() + crew.agents = [agent] + crew.tasks = [task1, task2] + crew.process = "sequential" + crew.verbose = False + crew.memory = False + crew.fingerprint = MagicMock() + crew.fingerprint.uuid_str = str(uuid.uuid4()) + crew.fingerprint.metadata = {} + + from crewai.events.types.crew_events import CrewKickoffStartedEvent + event = CrewKickoffStartedEvent( + crew=crew, crew_name="TestCrew", inputs=None + ) + + result = listener._build_event_data("crew_kickoff_started", event, None) + task2_data = result["crew_structure"]["tasks"][1] + assert "context_task_ids" in task2_data + assert str(task1.id) in task2_data["context_task_ids"] + + def test_generic_event_uses_trace_context(self, listener): + """Non-complex events should use context-based serialization.""" + from crewai.events.types.crew_events import CrewKickoffCompletedEvent + + crew = MagicMock() + crew.fingerprint = MagicMock() + crew.fingerprint.uuid_str = str(uuid.uuid4()) + crew.fingerprint.metadata = {} + + event = CrewKickoffCompletedEvent( + crew=crew, crew_name="TestCrew", output="Final result", total_tokens=5000 + ) + + result = listener._build_event_data("crew_kickoff_completed", event, None) + + # Scalar fields preserved + assert result.get("crew_name") == "TestCrew" + assert result.get("total_tokens") == 5000 + # crew excluded by field serializer + assert result.get("crew") is None + # No crew_structure (that's only for kickoff_started) + assert "crew_structure" not in result + + def test_task_started_custom_projection(self, listener): + task = _make_mock_task(name="Test Task") + from crewai.events.types.task_events import TaskStartedEvent + event = TaskStartedEvent(task=task, context="test context") + source = MagicMock() + source.agent = _make_stub_agent(role="Worker") + + result = listener._build_event_data("task_started", event, source) + + assert result["task_name"] == "Test Task" + assert result["agent_role"] == "Worker" + assert result["task_id"] == str(task.id) + assert result["context"] == "test context" + + def test_llm_call_started_uses_trace_context(self, listener): + from crewai.events.types.llm_events import LLMCallStartedEvent + + event = LLMCallStartedEvent( + call_id="test", + messages=[{"role": "user", "content": "Hello"}], + tools=[{"name": "search"}], + callbacks=[MagicMock()], + available_functions={"fn": MagicMock()}, + ) + + result = listener._build_event_data("llm_call_started", event, None) + + # callbacks and available_functions excluded via field serializer + assert result.get("callbacks") is None + assert result.get("available_functions") is None + # tools preserved (lightweight schemas) + assert result.get("tools") == [{"name": "search"}] + + def test_agent_execution_error_preserves_identification(self, listener): + """Error events should preserve agent/task identification via field serializers.""" + from crewai.events.types.agent_events import AgentExecutionErrorEvent + + agent = _make_stub_agent(role="Analyst") + task = _make_mock_task(name="Analysis") + + event = AgentExecutionErrorEvent( + agent=agent, task=task, error="Something broke" + ) + + result = listener._build_event_data("agent_execution_error", event, None) + + # Field serializers return lightweight refs, not None + assert result["agent"] == {"id": str(agent.id), "role": "Analyst"} + assert result["task"] == {"id": str(task.id), "name": "Analysis"} + assert result["error"] == "Something broke" + + def test_task_failed_preserves_identification(self, listener): + from crewai.events.types.task_events import TaskFailedEvent + + task = _make_mock_task(name="Failed Task") + event = TaskFailedEvent(task=task, error="Task failed") + + result = listener._build_event_data("task_failed", event, None) + + assert result["task"] == {"id": str(task.id), "name": "Failed Task"} + assert result["error"] == "Task failed" + + +# --------------------------------------------------------------------------- +# Size reduction verification +# --------------------------------------------------------------------------- + +class TestSizeReduction: + @pytest.fixture + def listener(self): + from crewai.events.listeners.tracing.trace_listener import ( + TraceCollectionListener, + ) + TraceCollectionListener._instance = None + TraceCollectionListener._initialized = False + TraceCollectionListener._listeners_setup = False + return TraceCollectionListener() + + def test_task_started_event_size(self, listener): + """task_started event data should be well under 2KB.""" + agent = _make_stub_agent( + role="Researcher", + goal="Research" * 50, + backstory="Expert" * 100, + ) + agent.tools = [_make_stub_tool(f"tool_{i}") for i in range(5)] + + task = _make_mock_task( + name="Research Task", + description="Detailed description" * 20, + expected_output="Expected" * 10, + agent=agent, + ) + task.context = [_make_mock_task() for _ in range(3)] + task.tools = [_make_stub_tool(f"t_{i}") for i in range(3)] + + from crewai.events.types.task_events import TaskStartedEvent + event = TaskStartedEvent(task=task, context="test context") + source = MagicMock() + source.agent = agent + + result = listener._build_event_data("task_started", event, source) + serialized = json.dumps(result, default=str) + + assert len(serialized) < 2000, f"task_started too large: {len(serialized)} bytes" + assert "task_name" in result + assert "agent_role" in result + + def test_error_event_size(self, listener): + """Error events should be small despite having agent/task refs.""" + from crewai.events.types.agent_events import AgentExecutionErrorEvent + + agent = _make_stub_agent( + goal="Very long goal " * 100, + backstory="Very long backstory " * 100, + ) + task = _make_mock_task(description="Very long description " * 100) + + event = AgentExecutionErrorEvent( + agent=agent, task=task, error="error" + ) + + result = listener._build_event_data("agent_execution_error", event, None) + serialized = json.dumps(result, default=str) + + # Should be small - agent/task are just {id, role/name} refs + assert len(serialized) < 5000, f"error event too large: {len(serialized)} bytes" + + +# --------------------------------------------------------------------------- +# to_serializable context threading +# --------------------------------------------------------------------------- + +class TestToSerializableContext: + """Test that context parameter flows through to_serializable correctly.""" + + def test_context_passed_to_model_dump(self): + from crewai.events.types.agent_events import AgentExecutionErrorEvent + + agent = _make_stub_agent(role="Tester") + task = _make_mock_task(name="Test Task") + + event = AgentExecutionErrorEvent( + agent=agent, task=task, error="test error" + ) + + # Directly use to_serializable with context + result = to_serializable(event, context={"trace": True}) + assert isinstance(result, dict) + assert result["agent"] == {"id": str(agent.id), "role": "Tester"} + assert result["task"] == {"id": str(task.id), "name": "Test Task"} + + def test_no_context_does_not_trigger_serializers(self): + from crewai.events.types.crew_events import CrewKickoffStartedEvent + + crew = MagicMock() + crew.fingerprint = MagicMock() + crew.fingerprint.uuid_str = str(uuid.uuid4()) + crew.fingerprint.metadata = {} + + event = CrewKickoffStartedEvent( + crew=crew, crew_name="Test", inputs=None + ) + + # Without context, crew should NOT be None + result = event.model_dump() + assert result["crew"] is not None \ No newline at end of file