Compare commits

...

4 Commits

Author SHA1 Message Date
Alex
fad083ffaa fix: add 'task' (singular) to TRACE_EXCLUDE_FIELDS
agent_execution_error and task_failed events have a task: Any field
holding a full Task object. 'tasks' (plural) was excluded but 'task'
(singular) was not, so the full object leaked through serialization.
2026-03-30 18:53:31 -07:00
Alex
b753012fc8 fix: preserve agent/task identification in error events and fix exclude propagation
- Add custom handlers for agent_execution_error and task_failed events to
  extract agent_role/agent_id and task_name/task_id before generic serialization
  strips them (these fields are in TRACE_EXCLUDE_FIELDS)
- Remove exclude propagation in __dict__ handler to match Pydantic fallback behavior

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-30 14:18:09 -07:00
Alex
b40098b28e fix: address PR review comments on trace serialization
1. Remove unused import Mock from test_trace_serialization.py
2. Remove unused import safe_serialize_to_dict from test_trace_serialization.py
3. Fix LLM event tools data being silently excluded: Remove 'tools' from
   TRACE_EXCLUDE_FIELDS since LLMCallStartedEvent.tools is a lightweight
   list of tool schemas, not heavy Agent.tools objects. Agent.tools is
   already excluded explicitly in _build_crew_started_data.
4. Remove dead code: complex_events class variable from TraceCollectionListener

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-30 11:00:28 -07:00
Alex
a4f1164812 fix: reduce trace event serialization bloat by excluding redundant nested objects
Each trace event was serializing the ENTIRE Crew/Task/Agent object graph into
event_data JSONB, causing 500GB+ trace tables in production. For a crew with
5 agents and 10 tasks, each event could be 50-100KB because:
- Crew serialized full tasks AND full agents (with all tools, LLM configs)
- Each Task re-serialized its agent (same Agent already in Crew.agents)
- Each Task re-serialized context tasks (same Tasks already in Crew.tasks)

This fix:
1. Adds TRACE_EXCLUDE_FIELDS constant listing back-references and heavy fields
   to exclude (crew, agent, agents, tasks, context, tools, llm, callbacks, etc.)

2. Adds _serialize_for_trace() helper that uses safe_serialize_to_dict with
   the exclusion set, keeping scalar fields (agent_role, task_name, etc.)
   that the AMP frontend actually reads

3. Updates _build_event_data() to use lightweight serialization for all
   events except crew_kickoff_started

4. Adds _build_crew_started_data() that serializes the full crew structure
   ONCE with:
   - Agents with tool_names (list of strings, not full tool objects)
   - Tasks with agent_ref (just {id, role}) instead of full agent
   - Tasks with context_task_ids (just IDs) instead of full context tasks

5. Updates to_serializable() in serialization.py to:
   - Handle callable objects (functions/lambdas) by falling through to repr()
   - Handle regular classes with __dict__ (not just Pydantic models)

Expected size reduction: 50-100KB per event down to ~1-2KB per event.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-30 09:25:01 -07:00
3 changed files with 758 additions and 19 deletions

View File

@@ -126,18 +126,59 @@ from crewai.events.types.tool_usage_events import (
from crewai.events.utils.console_formatter import ConsoleFormatter
# Fields to exclude from trace serialization to reduce redundant data.
# These back-references and heavy objects create massive bloat when serialized
# repeatedly across events (crew->agents->tasks->agent creates circular refs).
TRACE_EXCLUDE_FIELDS = {
# Back-references that create redundant/circular data
"crew",
"agent",
"agents",
"task",
"tasks",
"context",
# Heavy fields not needed in individual trace events
# NOTE: "tools" intentionally NOT here - LLMCallStartedEvent.tools is lightweight
# (list of tool schemas). Agent.tools is excluded in _build_crew_started_data.
"llm",
"function_calling_llm",
"step_callback",
"task_callback",
"crew_callback",
"callbacks",
"_memory",
"_cache",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"knowledge_sources",
}
def _serialize_for_trace(
event: Any, extra_exclude: set[str] | None = None
) -> dict[str, Any]:
"""Serialize an event for tracing, excluding redundant back-references.
Keeps all scalar fields (agent_role, task_name, etc.) that the AMP frontend uses.
Replaces heavy nested objects with lightweight ID references to reduce trace bloat.
Args:
event: The event object to serialize.
extra_exclude: Additional fields to exclude beyond TRACE_EXCLUDE_FIELDS.
Returns:
A dictionary with the serialized event data.
"""
exclude = TRACE_EXCLUDE_FIELDS.copy()
if extra_exclude:
exclude.update(extra_exclude)
return safe_serialize_to_dict(event, exclude=exclude)
class TraceCollectionListener(BaseEventListener):
"""Trace collection listener that orchestrates trace collection."""
complex_events: ClassVar[list[str]] = [
"task_started",
"task_completed",
"llm_call_started",
"llm_call_completed",
"agent_execution_started",
"agent_execution_completed",
]
_instance: Self | None = None
_initialized: bool = False
_listeners_setup: bool = False
@@ -810,9 +851,17 @@ class TraceCollectionListener(BaseEventListener):
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return safe_serialize_to_dict(event)
"""Build event data with optimized serialization to reduce trace bloat.
For most events, excludes heavy nested objects (crew, agents, tasks, tools)
that would create massive redundant data. Only crew_kickoff_started gets
the full crew structure as a one-time dump.
"""
# crew_kickoff_started is special: include full crew structure ONCE
if event_type == "crew_kickoff_started":
return self._build_crew_started_data(event)
# Complex events have custom handling that already extracts only needed fields
if event_type == "task_started":
task_name = event.task.name or event.task.description
task_display_name = (
@@ -853,19 +902,101 @@ class TraceCollectionListener(BaseEventListener):
"agent_backstory": event.agent.backstory,
}
if event_type == "llm_call_started":
event_data = safe_serialize_to_dict(event)
event_data = _serialize_for_trace(event)
event_data["task_name"] = event.task_name or getattr(
event, "task_description", None
)
return event_data
if event_type == "llm_call_completed":
return safe_serialize_to_dict(event)
return _serialize_for_trace(event)
return {
"event_type": event_type,
"event": safe_serialize_to_dict(event),
"source": source,
}
# Error events need agent/task identification extracted before generic
# serialization strips them (agent/task are in TRACE_EXCLUDE_FIELDS)
if event_type == "agent_execution_error":
event_data = _serialize_for_trace(event)
if event.agent:
event_data["agent_role"] = getattr(event.agent, "role", None)
event_data["agent_id"] = str(getattr(event.agent, "id", ""))
return event_data
if event_type == "task_failed":
event_data = _serialize_for_trace(event)
if event.task:
event_data["task_name"] = getattr(event.task, "name", None) or getattr(
event.task, "description", None
)
event_data["task_id"] = str(getattr(event.task, "id", ""))
return event_data
# For all other events, use lightweight serialization
return _serialize_for_trace(event)
def _build_crew_started_data(self, event: Any) -> dict[str, Any]:
"""Build comprehensive crew structure for crew_kickoff_started event.
This is the ONE place where we serialize the full crew structure.
Subsequent events use lightweight references to avoid redundancy.
"""
event_data = _serialize_for_trace(event)
# Add full crew structure with optimized agent/task serialization
crew = getattr(event, "crew", None)
if crew is not None:
# Serialize agents with tools (first occurrence only)
agents_data = []
for agent in getattr(crew, "agents", []) or []:
agent_data = {
"id": str(getattr(agent, "id", "")),
"role": getattr(agent, "role", ""),
"goal": getattr(agent, "goal", ""),
"backstory": getattr(agent, "backstory", ""),
"verbose": getattr(agent, "verbose", False),
"allow_delegation": getattr(agent, "allow_delegation", False),
"max_iter": getattr(agent, "max_iter", None),
"max_rpm": getattr(agent, "max_rpm", None),
}
# Include tool names (not full tool objects)
tools = getattr(agent, "tools", None)
if tools:
agent_data["tool_names"] = [
getattr(t, "name", str(t)) for t in tools
]
agents_data.append(agent_data)
# Serialize tasks with lightweight agent references
tasks_data = []
for task in getattr(crew, "tasks", []) or []:
task_data = {
"id": str(getattr(task, "id", "")),
"name": getattr(task, "name", None),
"description": getattr(task, "description", ""),
"expected_output": getattr(task, "expected_output", ""),
"async_execution": getattr(task, "async_execution", False),
"human_input": getattr(task, "human_input", False),
}
# Replace full agent with lightweight reference
task_agent = getattr(task, "agent", None)
if task_agent:
task_data["agent_ref"] = {
"id": str(getattr(task_agent, "id", "")),
"role": getattr(task_agent, "role", ""),
}
# Replace context tasks with lightweight references
context_tasks = getattr(task, "context", None)
if context_tasks:
task_data["context_task_ids"] = [
str(getattr(ct, "id", "")) for ct in context_tasks
]
tasks_data.append(task_data)
event_data["crew_structure"] = {
"agents": agents_data,
"tasks": tasks_data,
"process": str(getattr(crew, "process", "")),
"verbose": getattr(crew, "verbose", False),
"memory": getattr(crew, "memory", False),
}
return event_data
def _show_tracing_disabled_message(self) -> None:
"""Show a message when tracing is disabled."""

View File

@@ -103,6 +103,28 @@ def to_serializable(
}
except Exception:
return repr(obj)
# Callables (functions, methods, lambdas) should fall through to repr
if callable(obj):
return repr(obj)
# Handle regular classes with __dict__ (non-Pydantic)
# Note: Don't propagate exclude to recursive calls, matching Pydantic fallback behavior
if hasattr(obj, "__dict__"):
try:
return {
_to_serializable_key(k): to_serializable(
v,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
)
for k, v in obj.__dict__.items()
if k not in exclude and not k.startswith("_")
}
except Exception:
return repr(obj)
return repr(obj)

View File

@@ -0,0 +1,586 @@
"""Tests for trace serialization optimization to prevent trace table bloat.
These tests verify that trace events don't contain redundant full crew/task/agent
objects, reducing event sizes from 50-100KB to a few KB per event.
"""
import uuid
from unittest.mock import MagicMock
import pytest
from crewai.events.listeners.tracing.trace_listener import (
TRACE_EXCLUDE_FIELDS,
TraceCollectionListener,
_serialize_for_trace,
)
class TestTraceExcludeFields:
"""Test that TRACE_EXCLUDE_FIELDS contains all the heavy/redundant fields."""
def test_contains_back_references(self):
"""Verify back-reference fields are excluded."""
back_refs = {"crew", "agent", "agents", "tasks", "context"}
assert back_refs.issubset(TRACE_EXCLUDE_FIELDS)
def test_contains_heavy_fields(self):
"""Verify heavy objects are excluded.
Note: 'tools' is NOT in TRACE_EXCLUDE_FIELDS because LLMCallStartedEvent.tools
is a lightweight list of tool schemas. Agent.tools exclusion is handled
explicitly in _build_crew_started_data.
"""
heavy_fields = {
"llm",
"function_calling_llm",
"step_callback",
"task_callback",
"crew_callback",
"callbacks",
"_memory",
"_cache",
"knowledge_sources",
}
assert heavy_fields.issubset(TRACE_EXCLUDE_FIELDS)
# tools is NOT excluded globally - LLM events need it
assert "tools" not in TRACE_EXCLUDE_FIELDS
class TestSerializeForTrace:
"""Test the _serialize_for_trace helper function."""
def test_excludes_crew_field(self):
"""Verify crew field is excluded from serialization."""
event = MagicMock()
event.crew = MagicMock(name="TestCrew")
event.crew_name = "TestCrew"
event.timestamp = None
result = _serialize_for_trace(event)
# crew_name should be present (scalar field)
# crew should be excluded (back-reference)
assert "crew" not in result or result.get("crew") is None
def test_excludes_agent_field(self):
"""Verify agent field is excluded from serialization."""
event = MagicMock()
event.agent = MagicMock(role="TestAgent")
event.agent_role = "TestAgent"
result = _serialize_for_trace(event)
assert "agent" not in result or result.get("agent") is None
def test_preserves_tools_field(self):
"""Verify tools field is preserved for LLM events (lightweight schemas)."""
class EventWithTools:
def __init__(self):
self.tools = [{"name": "search", "description": "Search tool"}]
self.tool_name = "test_tool"
event = EventWithTools()
result = _serialize_for_trace(event)
# tools should be preserved (lightweight for LLM events)
assert "tools" in result
assert result["tools"] == [{"name": "search", "description": "Search tool"}]
def test_preserves_scalar_fields(self):
"""Verify scalar fields needed by AMP frontend are preserved."""
class SimpleEvent:
def __init__(self):
self.agent_role = "Researcher"
self.task_name = "Research Task"
self.task_id = str(uuid.uuid4())
self.duration_ms = 1500
self.tokens_used = 500
event = SimpleEvent()
result = _serialize_for_trace(event)
# Scalar fields should be preserved
assert result.get("agent_role") == "Researcher"
assert result.get("task_name") == "Research Task"
assert result.get("duration_ms") == 1500
assert result.get("tokens_used") == 500
def test_extra_exclude_parameter(self):
"""Verify extra_exclude adds to the default exclusions."""
class EventWithCustomField:
def __init__(self):
self.custom_heavy_field = {"large": "data" * 1000}
self.keep_this = "small"
event = EventWithCustomField()
result = _serialize_for_trace(event, extra_exclude={"custom_heavy_field"})
assert "custom_heavy_field" not in result
assert result.get("keep_this") == "small"
class TestBuildEventData:
"""Test _build_event_data method for different event types."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
# Reset singleton
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_task_started_no_full_task_object(self, listener):
"""Verify task_started event doesn't include full task object."""
mock_task = MagicMock()
mock_task.name = "Test Task"
mock_task.description = "A test task description"
mock_task.expected_output = "Expected result"
mock_task.id = uuid.uuid4()
# Add heavy fields that should NOT appear in output
mock_task.crew = MagicMock(name="HeavyCrew")
mock_task.agent = MagicMock(role="HeavyAgent")
mock_task.context = [MagicMock(), MagicMock()]
mock_task.tools = [MagicMock(), MagicMock()]
mock_event = MagicMock()
mock_event.task = mock_task
mock_event.context = "test context"
mock_source = MagicMock()
mock_source.agent = MagicMock()
mock_source.agent.role = "Worker"
result = listener._build_event_data("task_started", mock_event, mock_source)
# Should have scalar fields
assert result["task_name"] == "Test Task"
assert result["task_description"] == "A test task description"
assert result["agent_role"] == "Worker"
assert result["task_id"] == str(mock_task.id)
# Should NOT have full objects
assert "crew" not in result
assert "tools" not in result
# task and agent should not be full objects
assert result.get("task") is None or not hasattr(result.get("task"), "crew")
def test_task_completed_no_full_task_object(self, listener):
"""Verify task_completed event doesn't include full task object."""
mock_task = MagicMock()
mock_task.name = "Completed Task"
mock_task.description = "Task description"
mock_task.id = uuid.uuid4()
mock_output = MagicMock()
mock_output.raw = "Task result"
mock_output.output_format = "text"
mock_output.agent = "Worker"
mock_event = MagicMock()
mock_event.task = mock_task
mock_event.output = mock_output
result = listener._build_event_data("task_completed", mock_event, None)
# Should have scalar fields
assert result["task_name"] == "Completed Task"
assert result["output_raw"] == "Task result"
assert result["agent_role"] == "Worker"
# Should NOT have full task object
assert "crew" not in result
assert "tools" not in result
def test_agent_execution_started_no_full_agent(self, listener):
"""Verify agent_execution_started extracts only scalar fields."""
mock_agent = MagicMock()
mock_agent.role = "Analyst"
mock_agent.goal = "Analyze data"
mock_agent.backstory = "Expert analyst"
# Heavy fields
mock_agent.tools = [MagicMock(), MagicMock()]
mock_agent.llm = MagicMock()
mock_agent.crew = MagicMock()
mock_event = MagicMock()
mock_event.agent = mock_agent
result = listener._build_event_data(
"agent_execution_started", mock_event, None
)
# Should have scalar fields
assert result["agent_role"] == "Analyst"
assert result["agent_goal"] == "Analyze data"
assert result["agent_backstory"] == "Expert analyst"
# Should NOT have heavy objects
assert "tools" not in result
assert "llm" not in result
assert "crew" not in result
def test_llm_call_started_excludes_heavy_fields(self, listener):
"""Verify llm_call_started uses lightweight serialization.
LLMCallStartedEvent.tools is a lightweight list of tool schemas (dicts),
not heavy Agent.tools objects, so it should be preserved.
"""
class MockLLMEvent:
def __init__(self):
self.task_name = "LLM Task"
self.model = "gpt-4"
self.tokens = 100
# Heavy fields that should be excluded
self.crew = MagicMock()
self.agent = MagicMock()
# LLM event tools are lightweight schemas (dicts), should be kept
self.tools = [{"name": "search", "description": "Search tool"}]
mock_event = MockLLMEvent()
result = listener._build_event_data("llm_call_started", mock_event, None)
# task_name should be present
assert result["task_name"] == "LLM Task"
# Heavy fields should be excluded
assert "crew" not in result or result.get("crew") is None
assert "agent" not in result or result.get("agent") is None
# LLM tools (lightweight schemas) should be preserved
assert result.get("tools") == [{"name": "search", "description": "Search tool"}]
def test_llm_call_completed_excludes_heavy_fields(self, listener):
"""Verify llm_call_completed uses lightweight serialization."""
class MockLLMCompletedEvent:
def __init__(self):
self.response = "LLM response"
self.tokens_used = 150
self.duration_ms = 500
# Heavy fields
self.crew = MagicMock()
self.agent = MagicMock()
mock_event = MockLLMCompletedEvent()
result = listener._build_event_data("llm_call_completed", mock_event, None)
# Scalar fields preserved
assert result.get("response") == "LLM response"
assert result.get("tokens_used") == 150
# Heavy fields excluded
assert "crew" not in result or result.get("crew") is None
assert "agent" not in result or result.get("agent") is None
class TestCrewKickoffStartedEvent:
"""Test that crew_kickoff_started event has full structure."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_crew_started_has_crew_structure(self, listener):
"""Verify crew_kickoff_started includes the crew_structure field."""
# Create mock crew with agents and tasks
mock_agent1 = MagicMock()
mock_agent1.id = uuid.uuid4()
mock_agent1.role = "Researcher"
mock_agent1.goal = "Research things"
mock_agent1.backstory = "Expert researcher"
mock_agent1.verbose = True
mock_agent1.allow_delegation = False
mock_agent1.max_iter = 10
mock_agent1.max_rpm = None
mock_agent1.tools = [MagicMock(name="search_tool"), MagicMock(name="read_tool")]
mock_agent2 = MagicMock()
mock_agent2.id = uuid.uuid4()
mock_agent2.role = "Writer"
mock_agent2.goal = "Write content"
mock_agent2.backstory = "Expert writer"
mock_agent2.verbose = False
mock_agent2.allow_delegation = True
mock_agent2.max_iter = 5
mock_agent2.max_rpm = 10
mock_agent2.tools = []
mock_task1 = MagicMock()
mock_task1.id = uuid.uuid4()
mock_task1.name = "Research Task"
mock_task1.description = "Do research"
mock_task1.expected_output = "Research results"
mock_task1.async_execution = False
mock_task1.human_input = False
mock_task1.agent = mock_agent1
mock_task1.context = None
mock_task2 = MagicMock()
mock_task2.id = uuid.uuid4()
mock_task2.name = "Writing Task"
mock_task2.description = "Write report"
mock_task2.expected_output = "Written report"
mock_task2.async_execution = True
mock_task2.human_input = True
mock_task2.agent = mock_agent2
mock_task2.context = [mock_task1]
mock_crew = MagicMock()
mock_crew.agents = [mock_agent1, mock_agent2]
mock_crew.tasks = [mock_task1, mock_task2]
mock_crew.process = "sequential"
mock_crew.verbose = True
mock_crew.memory = False
mock_event = MagicMock()
mock_event.crew = mock_crew
mock_event.crew_name = "TestCrew"
mock_event.inputs = {"key": "value"}
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
# Should have crew_structure
assert "crew_structure" in result
crew_structure = result["crew_structure"]
# Verify agents are serialized with tool names
assert len(crew_structure["agents"]) == 2
agent1_data = crew_structure["agents"][0]
assert agent1_data["role"] == "Researcher"
assert agent1_data["goal"] == "Research things"
assert "tool_names" in agent1_data
assert len(agent1_data["tool_names"]) == 2
# Verify tasks have lightweight agent references
assert len(crew_structure["tasks"]) == 2
task2_data = crew_structure["tasks"][1]
assert task2_data["name"] == "Writing Task"
assert "agent_ref" in task2_data
assert task2_data["agent_ref"]["role"] == "Writer"
# Verify context uses task IDs
assert "context_task_ids" in task2_data
assert str(mock_task1.id) in task2_data["context_task_ids"]
def test_crew_started_agents_no_full_tools(self, listener):
"""Verify agents in crew_structure have tool_names, not full tool objects."""
mock_tool = MagicMock()
mock_tool.name = "web_search"
mock_tool.description = "Search the web"
mock_tool.func = lambda x: x # Heavy callable
mock_tool.args_schema = {"type": "object"} # Schema
mock_agent = MagicMock()
mock_agent.id = uuid.uuid4()
mock_agent.role = "Searcher"
mock_agent.goal = "Search"
mock_agent.backstory = "Expert"
mock_agent.verbose = False
mock_agent.allow_delegation = False
mock_agent.max_iter = 5
mock_agent.max_rpm = None
mock_agent.tools = [mock_tool]
mock_crew = MagicMock()
mock_crew.agents = [mock_agent]
mock_crew.tasks = []
mock_crew.process = "sequential"
mock_crew.verbose = False
mock_crew.memory = False
mock_event = MagicMock()
mock_event.crew = mock_crew
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
agent_data = result["crew_structure"]["agents"][0]
# Should have tool_names (list of strings)
assert "tool_names" in agent_data
assert agent_data["tool_names"] == ["web_search"]
# Should NOT have full tools array
assert "tools" not in agent_data
def test_crew_started_tasks_no_full_agent(self, listener):
"""Verify tasks have agent_ref, not full agent object."""
mock_agent = MagicMock()
mock_agent.id = uuid.uuid4()
mock_agent.role = "Worker"
mock_agent.goal = "Work hard"
mock_agent.backstory = "Dedicated worker"
mock_agent.tools = [MagicMock(), MagicMock()]
mock_agent.llm = MagicMock()
mock_task = MagicMock()
mock_task.id = uuid.uuid4()
mock_task.name = "Work Task"
mock_task.description = "Do work"
mock_task.expected_output = "Work done"
mock_task.async_execution = False
mock_task.human_input = False
mock_task.agent = mock_agent
mock_task.context = None
mock_crew = MagicMock()
mock_crew.agents = [mock_agent]
mock_crew.tasks = [mock_task]
mock_crew.process = "sequential"
mock_crew.verbose = False
mock_crew.memory = False
mock_event = MagicMock()
mock_event.crew = mock_crew
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
task_data = result["crew_structure"]["tasks"][0]
# Should have lightweight agent_ref
assert "agent_ref" in task_data
assert task_data["agent_ref"]["id"] == str(mock_agent.id)
assert task_data["agent_ref"]["role"] == "Worker"
# agent_ref should ONLY have id and role (not tools, llm, etc.)
assert len(task_data["agent_ref"]) == 2
# Should NOT have full agent
assert "agent" not in task_data
class TestNonCrewStartedEvents:
"""Test that non-crew_started events don't have redundant data."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_generic_event_no_crew(self, listener):
"""Verify generic events exclude crew object.
Note: 'tools' is now preserved since LLMCallStartedEvent.tools is lightweight.
"""
class GenericEvent:
def __init__(self):
self.event_type = "some_event"
self.data = "some_data"
# These should be excluded
self.crew = MagicMock()
self.agents = [MagicMock()]
self.tasks = [MagicMock()]
# tools is now preserved (for LLM events it's lightweight)
self.tools = [{"name": "search"}]
mock_event = GenericEvent()
result = listener._build_event_data("some_event", mock_event, None)
# Scalar fields preserved
assert result.get("event_type") == "some_event"
assert result.get("data") == "some_data"
# Heavy fields excluded
assert "crew" not in result or result.get("crew") is None
assert "agents" not in result or result.get("agents") is None
assert "tasks" not in result or result.get("tasks") is None
# tools is now preserved (lightweight for LLM events)
assert result.get("tools") == [{"name": "search"}]
def test_crew_kickoff_completed_no_full_crew(self, listener):
"""Verify crew_kickoff_completed doesn't repeat full crew structure."""
class CrewCompletedEvent:
def __init__(self):
self.crew_name = "TestCrew"
self.total_tokens = 5000
self.output = "Final output"
# Should be excluded
self.crew = MagicMock()
self.crew.agents = [MagicMock(), MagicMock()]
self.crew.tasks = [MagicMock()]
mock_event = CrewCompletedEvent()
result = listener._build_event_data("crew_kickoff_completed", mock_event, None)
# Scalar fields preserved
assert result.get("crew_name") == "TestCrew"
assert result.get("total_tokens") == 5000
# Should NOT have full crew object
assert "crew" not in result or result.get("crew") is None
# Should NOT have crew_structure (that's only for crew_started)
assert "crew_structure" not in result
class TestSizeReduction:
"""Test that the optimization actually reduces serialized size."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_task_event_size_reduction(self, listener):
"""Verify task events are much smaller than naive serialization."""
import json
# Create a realistic task with many fields
mock_agent = MagicMock()
mock_agent.id = uuid.uuid4()
mock_agent.role = "Researcher"
mock_agent.goal = "Research" * 50 # Longer goal
mock_agent.backstory = "Expert" * 100 # Longer backstory
mock_agent.tools = [MagicMock() for _ in range(5)]
mock_agent.llm = MagicMock()
mock_agent.crew = MagicMock()
mock_task = MagicMock()
mock_task.name = "Research Task"
mock_task.description = "Detailed description" * 20
mock_task.expected_output = "Expected" * 10
mock_task.id = uuid.uuid4()
mock_task.agent = mock_agent
mock_task.context = [MagicMock() for _ in range(3)]
mock_task.crew = MagicMock()
mock_task.tools = [MagicMock() for _ in range(3)]
mock_event = MagicMock()
mock_event.task = mock_task
mock_event.context = "test context"
mock_source = MagicMock()
mock_source.agent = mock_agent
result = listener._build_event_data("task_started", mock_event, mock_source)
# The result should be relatively small
serialized = json.dumps(result, default=str)
# Should be under 2KB for task_started (was potentially 50-100KB before)
assert len(serialized) < 2000, f"task_started too large: {len(serialized)} bytes"
# Should have the essential fields
assert "task_name" in result
assert "task_id" in result
assert "agent_role" in result