diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index 9d81f1d55..9b3f46b76 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -126,6 +126,54 @@ from crewai.events.types.tool_usage_events import ( from crewai.events.utils.console_formatter import ConsoleFormatter +# Fields to exclude from trace serialization to reduce redundant data. +# These back-references and heavy objects create massive bloat when serialized +# repeatedly across events (crew->agents->tasks->agent creates circular refs). +TRACE_EXCLUDE_FIELDS = { + # Back-references that create redundant/circular data + "crew", + "agent", + "agents", + "tasks", + "context", + # Heavy fields not needed in individual trace events + "tools", + "llm", + "function_calling_llm", + "step_callback", + "task_callback", + "crew_callback", + "callbacks", + "_memory", + "_cache", + "_rpm_controller", + "_request_within_rpm_limit", + "_token_process", + "knowledge_sources", +} + + +def _serialize_for_trace( + event: Any, extra_exclude: set[str] | None = None +) -> dict[str, Any]: + """Serialize an event for tracing, excluding redundant back-references. + + Keeps all scalar fields (agent_role, task_name, etc.) that the AMP frontend uses. + Replaces heavy nested objects with lightweight ID references to reduce trace bloat. + + Args: + event: The event object to serialize. + extra_exclude: Additional fields to exclude beyond TRACE_EXCLUDE_FIELDS. + + Returns: + A dictionary with the serialized event data. + """ + exclude = TRACE_EXCLUDE_FIELDS.copy() + if extra_exclude: + exclude.update(extra_exclude) + return safe_serialize_to_dict(event, exclude=exclude) + + class TraceCollectionListener(BaseEventListener): """Trace collection listener that orchestrates trace collection.""" @@ -810,9 +858,17 @@ class TraceCollectionListener(BaseEventListener): def _build_event_data( self, event_type: str, event: Any, source: Any ) -> dict[str, Any]: - """Build event data""" - if event_type not in self.complex_events: - return safe_serialize_to_dict(event) + """Build event data with optimized serialization to reduce trace bloat. + + For most events, excludes heavy nested objects (crew, agents, tasks, tools) + that would create massive redundant data. Only crew_kickoff_started gets + the full crew structure as a one-time dump. + """ + # crew_kickoff_started is special: include full crew structure ONCE + if event_type == "crew_kickoff_started": + return self._build_crew_started_data(event) + + # Complex events have custom handling that already extracts only needed fields if event_type == "task_started": task_name = event.task.name or event.task.description task_display_name = ( @@ -853,19 +909,84 @@ class TraceCollectionListener(BaseEventListener): "agent_backstory": event.agent.backstory, } if event_type == "llm_call_started": - event_data = safe_serialize_to_dict(event) + event_data = _serialize_for_trace(event) event_data["task_name"] = event.task_name or getattr( event, "task_description", None ) return event_data if event_type == "llm_call_completed": - return safe_serialize_to_dict(event) + return _serialize_for_trace(event) - return { - "event_type": event_type, - "event": safe_serialize_to_dict(event), - "source": source, - } + # For all other events, use lightweight serialization + return _serialize_for_trace(event) + + def _build_crew_started_data(self, event: Any) -> dict[str, Any]: + """Build comprehensive crew structure for crew_kickoff_started event. + + This is the ONE place where we serialize the full crew structure. + Subsequent events use lightweight references to avoid redundancy. + """ + event_data = _serialize_for_trace(event) + + # Add full crew structure with optimized agent/task serialization + crew = getattr(event, "crew", None) + if crew is not None: + # Serialize agents with tools (first occurrence only) + agents_data = [] + for agent in getattr(crew, "agents", []) or []: + agent_data = { + "id": str(getattr(agent, "id", "")), + "role": getattr(agent, "role", ""), + "goal": getattr(agent, "goal", ""), + "backstory": getattr(agent, "backstory", ""), + "verbose": getattr(agent, "verbose", False), + "allow_delegation": getattr(agent, "allow_delegation", False), + "max_iter": getattr(agent, "max_iter", None), + "max_rpm": getattr(agent, "max_rpm", None), + } + # Include tool names (not full tool objects) + tools = getattr(agent, "tools", None) + if tools: + agent_data["tool_names"] = [ + getattr(t, "name", str(t)) for t in tools + ] + agents_data.append(agent_data) + + # Serialize tasks with lightweight agent references + tasks_data = [] + for task in getattr(crew, "tasks", []) or []: + task_data = { + "id": str(getattr(task, "id", "")), + "name": getattr(task, "name", None), + "description": getattr(task, "description", ""), + "expected_output": getattr(task, "expected_output", ""), + "async_execution": getattr(task, "async_execution", False), + "human_input": getattr(task, "human_input", False), + } + # Replace full agent with lightweight reference + task_agent = getattr(task, "agent", None) + if task_agent: + task_data["agent_ref"] = { + "id": str(getattr(task_agent, "id", "")), + "role": getattr(task_agent, "role", ""), + } + # Replace context tasks with lightweight references + context_tasks = getattr(task, "context", None) + if context_tasks: + task_data["context_task_ids"] = [ + str(getattr(ct, "id", "")) for ct in context_tasks + ] + tasks_data.append(task_data) + + event_data["crew_structure"] = { + "agents": agents_data, + "tasks": tasks_data, + "process": str(getattr(crew, "process", "")), + "verbose": getattr(crew, "verbose", False), + "memory": getattr(crew, "memory", False), + } + + return event_data def _show_tracing_disabled_message(self) -> None: """Show a message when tracing is disabled.""" diff --git a/lib/crewai/src/crewai/utilities/serialization.py b/lib/crewai/src/crewai/utilities/serialization.py index 0207e80ab..0cb48dade 100644 --- a/lib/crewai/src/crewai/utilities/serialization.py +++ b/lib/crewai/src/crewai/utilities/serialization.py @@ -103,6 +103,28 @@ def to_serializable( } except Exception: return repr(obj) + + # Callables (functions, methods, lambdas) should fall through to repr + if callable(obj): + return repr(obj) + + # Handle regular classes with __dict__ (non-Pydantic) + if hasattr(obj, "__dict__"): + try: + return { + _to_serializable_key(k): to_serializable( + v, + exclude=exclude, + max_depth=max_depth, + _current_depth=_current_depth + 1, + _ancestors=new_ancestors, + ) + for k, v in obj.__dict__.items() + if k not in exclude and not k.startswith("_") + } + except Exception: + return repr(obj) + return repr(obj) diff --git a/lib/crewai/tests/tracing/test_trace_serialization.py b/lib/crewai/tests/tracing/test_trace_serialization.py new file mode 100644 index 000000000..aca035a4b --- /dev/null +++ b/lib/crewai/tests/tracing/test_trace_serialization.py @@ -0,0 +1,565 @@ +"""Tests for trace serialization optimization to prevent trace table bloat. + +These tests verify that trace events don't contain redundant full crew/task/agent +objects, reducing event sizes from 50-100KB to a few KB per event. +""" + +import uuid +from unittest.mock import MagicMock, Mock + +import pytest + +from crewai.events.listeners.tracing.trace_listener import ( + TRACE_EXCLUDE_FIELDS, + TraceCollectionListener, + _serialize_for_trace, +) +from crewai.events.listeners.tracing.utils import safe_serialize_to_dict + + +class TestTraceExcludeFields: + """Test that TRACE_EXCLUDE_FIELDS contains all the heavy/redundant fields.""" + + def test_contains_back_references(self): + """Verify back-reference fields are excluded.""" + back_refs = {"crew", "agent", "agents", "tasks", "context"} + assert back_refs.issubset(TRACE_EXCLUDE_FIELDS) + + def test_contains_heavy_fields(self): + """Verify heavy objects are excluded.""" + heavy_fields = { + "tools", + "llm", + "function_calling_llm", + "step_callback", + "task_callback", + "crew_callback", + "callbacks", + "_memory", + "_cache", + "knowledge_sources", + } + assert heavy_fields.issubset(TRACE_EXCLUDE_FIELDS) + + +class TestSerializeForTrace: + """Test the _serialize_for_trace helper function.""" + + def test_excludes_crew_field(self): + """Verify crew field is excluded from serialization.""" + event = MagicMock() + event.crew = MagicMock(name="TestCrew") + event.crew_name = "TestCrew" + event.timestamp = None + + result = _serialize_for_trace(event) + + # crew_name should be present (scalar field) + # crew should be excluded (back-reference) + assert "crew" not in result or result.get("crew") is None + + def test_excludes_agent_field(self): + """Verify agent field is excluded from serialization.""" + event = MagicMock() + event.agent = MagicMock(role="TestAgent") + event.agent_role = "TestAgent" + + result = _serialize_for_trace(event) + + assert "agent" not in result or result.get("agent") is None + + def test_excludes_tools_field(self): + """Verify tools field is excluded from serialization.""" + event = MagicMock() + event.tools = [MagicMock(), MagicMock()] + event.tool_name = "test_tool" + + result = _serialize_for_trace(event) + + assert "tools" not in result or result.get("tools") is None + + def test_preserves_scalar_fields(self): + """Verify scalar fields needed by AMP frontend are preserved.""" + + class SimpleEvent: + def __init__(self): + self.agent_role = "Researcher" + self.task_name = "Research Task" + self.task_id = str(uuid.uuid4()) + self.duration_ms = 1500 + self.tokens_used = 500 + + event = SimpleEvent() + result = _serialize_for_trace(event) + + # Scalar fields should be preserved + assert result.get("agent_role") == "Researcher" + assert result.get("task_name") == "Research Task" + assert result.get("duration_ms") == 1500 + assert result.get("tokens_used") == 500 + + def test_extra_exclude_parameter(self): + """Verify extra_exclude adds to the default exclusions.""" + + class EventWithCustomField: + def __init__(self): + self.custom_heavy_field = {"large": "data" * 1000} + self.keep_this = "small" + + event = EventWithCustomField() + result = _serialize_for_trace(event, extra_exclude={"custom_heavy_field"}) + + assert "custom_heavy_field" not in result + assert result.get("keep_this") == "small" + + +class TestBuildEventData: + """Test _build_event_data method for different event types.""" + + @pytest.fixture + def listener(self): + """Create a trace listener for testing.""" + # Reset singleton + TraceCollectionListener._instance = None + TraceCollectionListener._initialized = False + TraceCollectionListener._listeners_setup = False + return TraceCollectionListener() + + def test_task_started_no_full_task_object(self, listener): + """Verify task_started event doesn't include full task object.""" + mock_task = MagicMock() + mock_task.name = "Test Task" + mock_task.description = "A test task description" + mock_task.expected_output = "Expected result" + mock_task.id = uuid.uuid4() + # Add heavy fields that should NOT appear in output + mock_task.crew = MagicMock(name="HeavyCrew") + mock_task.agent = MagicMock(role="HeavyAgent") + mock_task.context = [MagicMock(), MagicMock()] + mock_task.tools = [MagicMock(), MagicMock()] + + mock_event = MagicMock() + mock_event.task = mock_task + mock_event.context = "test context" + + mock_source = MagicMock() + mock_source.agent = MagicMock() + mock_source.agent.role = "Worker" + + result = listener._build_event_data("task_started", mock_event, mock_source) + + # Should have scalar fields + assert result["task_name"] == "Test Task" + assert result["task_description"] == "A test task description" + assert result["agent_role"] == "Worker" + assert result["task_id"] == str(mock_task.id) + + # Should NOT have full objects + assert "crew" not in result + assert "tools" not in result + # task and agent should not be full objects + assert result.get("task") is None or not hasattr(result.get("task"), "crew") + + def test_task_completed_no_full_task_object(self, listener): + """Verify task_completed event doesn't include full task object.""" + mock_task = MagicMock() + mock_task.name = "Completed Task" + mock_task.description = "Task description" + mock_task.id = uuid.uuid4() + + mock_output = MagicMock() + mock_output.raw = "Task result" + mock_output.output_format = "text" + mock_output.agent = "Worker" + + mock_event = MagicMock() + mock_event.task = mock_task + mock_event.output = mock_output + + result = listener._build_event_data("task_completed", mock_event, None) + + # Should have scalar fields + assert result["task_name"] == "Completed Task" + assert result["output_raw"] == "Task result" + assert result["agent_role"] == "Worker" + + # Should NOT have full task object + assert "crew" not in result + assert "tools" not in result + + def test_agent_execution_started_no_full_agent(self, listener): + """Verify agent_execution_started extracts only scalar fields.""" + mock_agent = MagicMock() + mock_agent.role = "Analyst" + mock_agent.goal = "Analyze data" + mock_agent.backstory = "Expert analyst" + # Heavy fields + mock_agent.tools = [MagicMock(), MagicMock()] + mock_agent.llm = MagicMock() + mock_agent.crew = MagicMock() + + mock_event = MagicMock() + mock_event.agent = mock_agent + + result = listener._build_event_data( + "agent_execution_started", mock_event, None + ) + + # Should have scalar fields + assert result["agent_role"] == "Analyst" + assert result["agent_goal"] == "Analyze data" + assert result["agent_backstory"] == "Expert analyst" + + # Should NOT have heavy objects + assert "tools" not in result + assert "llm" not in result + assert "crew" not in result + + def test_llm_call_started_excludes_heavy_fields(self, listener): + """Verify llm_call_started uses lightweight serialization.""" + + class MockLLMEvent: + def __init__(self): + self.task_name = "LLM Task" + self.model = "gpt-4" + self.tokens = 100 + # Heavy fields that should be excluded + self.crew = MagicMock() + self.agent = MagicMock() + self.tools = [MagicMock()] + + mock_event = MockLLMEvent() + + result = listener._build_event_data("llm_call_started", mock_event, None) + + # task_name should be present + assert result["task_name"] == "LLM Task" + + # Heavy fields should be excluded + assert "crew" not in result or result.get("crew") is None + assert "agent" not in result or result.get("agent") is None + assert "tools" not in result or result.get("tools") is None + + def test_llm_call_completed_excludes_heavy_fields(self, listener): + """Verify llm_call_completed uses lightweight serialization.""" + + class MockLLMCompletedEvent: + def __init__(self): + self.response = "LLM response" + self.tokens_used = 150 + self.duration_ms = 500 + # Heavy fields + self.crew = MagicMock() + self.agent = MagicMock() + + mock_event = MockLLMCompletedEvent() + + result = listener._build_event_data("llm_call_completed", mock_event, None) + + # Scalar fields preserved + assert result.get("response") == "LLM response" + assert result.get("tokens_used") == 150 + + # Heavy fields excluded + assert "crew" not in result or result.get("crew") is None + assert "agent" not in result or result.get("agent") is None + + +class TestCrewKickoffStartedEvent: + """Test that crew_kickoff_started event has full structure.""" + + @pytest.fixture + def listener(self): + """Create a trace listener for testing.""" + TraceCollectionListener._instance = None + TraceCollectionListener._initialized = False + TraceCollectionListener._listeners_setup = False + return TraceCollectionListener() + + def test_crew_started_has_crew_structure(self, listener): + """Verify crew_kickoff_started includes the crew_structure field.""" + # Create mock crew with agents and tasks + mock_agent1 = MagicMock() + mock_agent1.id = uuid.uuid4() + mock_agent1.role = "Researcher" + mock_agent1.goal = "Research things" + mock_agent1.backstory = "Expert researcher" + mock_agent1.verbose = True + mock_agent1.allow_delegation = False + mock_agent1.max_iter = 10 + mock_agent1.max_rpm = None + mock_agent1.tools = [MagicMock(name="search_tool"), MagicMock(name="read_tool")] + + mock_agent2 = MagicMock() + mock_agent2.id = uuid.uuid4() + mock_agent2.role = "Writer" + mock_agent2.goal = "Write content" + mock_agent2.backstory = "Expert writer" + mock_agent2.verbose = False + mock_agent2.allow_delegation = True + mock_agent2.max_iter = 5 + mock_agent2.max_rpm = 10 + mock_agent2.tools = [] + + mock_task1 = MagicMock() + mock_task1.id = uuid.uuid4() + mock_task1.name = "Research Task" + mock_task1.description = "Do research" + mock_task1.expected_output = "Research results" + mock_task1.async_execution = False + mock_task1.human_input = False + mock_task1.agent = mock_agent1 + mock_task1.context = None + + mock_task2 = MagicMock() + mock_task2.id = uuid.uuid4() + mock_task2.name = "Writing Task" + mock_task2.description = "Write report" + mock_task2.expected_output = "Written report" + mock_task2.async_execution = True + mock_task2.human_input = True + mock_task2.agent = mock_agent2 + mock_task2.context = [mock_task1] + + mock_crew = MagicMock() + mock_crew.agents = [mock_agent1, mock_agent2] + mock_crew.tasks = [mock_task1, mock_task2] + mock_crew.process = "sequential" + mock_crew.verbose = True + mock_crew.memory = False + + mock_event = MagicMock() + mock_event.crew = mock_crew + mock_event.crew_name = "TestCrew" + mock_event.inputs = {"key": "value"} + + result = listener._build_event_data("crew_kickoff_started", mock_event, None) + + # Should have crew_structure + assert "crew_structure" in result + crew_structure = result["crew_structure"] + + # Verify agents are serialized with tool names + assert len(crew_structure["agents"]) == 2 + agent1_data = crew_structure["agents"][0] + assert agent1_data["role"] == "Researcher" + assert agent1_data["goal"] == "Research things" + assert "tool_names" in agent1_data + assert len(agent1_data["tool_names"]) == 2 + + # Verify tasks have lightweight agent references + assert len(crew_structure["tasks"]) == 2 + task2_data = crew_structure["tasks"][1] + assert task2_data["name"] == "Writing Task" + assert "agent_ref" in task2_data + assert task2_data["agent_ref"]["role"] == "Writer" + + # Verify context uses task IDs + assert "context_task_ids" in task2_data + assert str(mock_task1.id) in task2_data["context_task_ids"] + + def test_crew_started_agents_no_full_tools(self, listener): + """Verify agents in crew_structure have tool_names, not full tool objects.""" + mock_tool = MagicMock() + mock_tool.name = "web_search" + mock_tool.description = "Search the web" + mock_tool.func = lambda x: x # Heavy callable + mock_tool.args_schema = {"type": "object"} # Schema + + mock_agent = MagicMock() + mock_agent.id = uuid.uuid4() + mock_agent.role = "Searcher" + mock_agent.goal = "Search" + mock_agent.backstory = "Expert" + mock_agent.verbose = False + mock_agent.allow_delegation = False + mock_agent.max_iter = 5 + mock_agent.max_rpm = None + mock_agent.tools = [mock_tool] + + mock_crew = MagicMock() + mock_crew.agents = [mock_agent] + mock_crew.tasks = [] + mock_crew.process = "sequential" + mock_crew.verbose = False + mock_crew.memory = False + + mock_event = MagicMock() + mock_event.crew = mock_crew + + result = listener._build_event_data("crew_kickoff_started", mock_event, None) + + agent_data = result["crew_structure"]["agents"][0] + + # Should have tool_names (list of strings) + assert "tool_names" in agent_data + assert agent_data["tool_names"] == ["web_search"] + + # Should NOT have full tools array + assert "tools" not in agent_data + + def test_crew_started_tasks_no_full_agent(self, listener): + """Verify tasks have agent_ref, not full agent object.""" + mock_agent = MagicMock() + mock_agent.id = uuid.uuid4() + mock_agent.role = "Worker" + mock_agent.goal = "Work hard" + mock_agent.backstory = "Dedicated worker" + mock_agent.tools = [MagicMock(), MagicMock()] + mock_agent.llm = MagicMock() + + mock_task = MagicMock() + mock_task.id = uuid.uuid4() + mock_task.name = "Work Task" + mock_task.description = "Do work" + mock_task.expected_output = "Work done" + mock_task.async_execution = False + mock_task.human_input = False + mock_task.agent = mock_agent + mock_task.context = None + + mock_crew = MagicMock() + mock_crew.agents = [mock_agent] + mock_crew.tasks = [mock_task] + mock_crew.process = "sequential" + mock_crew.verbose = False + mock_crew.memory = False + + mock_event = MagicMock() + mock_event.crew = mock_crew + + result = listener._build_event_data("crew_kickoff_started", mock_event, None) + + task_data = result["crew_structure"]["tasks"][0] + + # Should have lightweight agent_ref + assert "agent_ref" in task_data + assert task_data["agent_ref"]["id"] == str(mock_agent.id) + assert task_data["agent_ref"]["role"] == "Worker" + + # agent_ref should ONLY have id and role (not tools, llm, etc.) + assert len(task_data["agent_ref"]) == 2 + + # Should NOT have full agent + assert "agent" not in task_data + + +class TestNonCrewStartedEvents: + """Test that non-crew_started events don't have redundant data.""" + + @pytest.fixture + def listener(self): + """Create a trace listener for testing.""" + TraceCollectionListener._instance = None + TraceCollectionListener._initialized = False + TraceCollectionListener._listeners_setup = False + return TraceCollectionListener() + + def test_generic_event_no_crew(self, listener): + """Verify generic events exclude crew object.""" + + class GenericEvent: + def __init__(self): + self.event_type = "some_event" + self.data = "some_data" + # These should be excluded + self.crew = MagicMock() + self.agents = [MagicMock()] + self.tasks = [MagicMock()] + self.tools = [MagicMock()] + + mock_event = GenericEvent() + + result = listener._build_event_data("some_event", mock_event, None) + + # Scalar fields preserved + assert result.get("event_type") == "some_event" + assert result.get("data") == "some_data" + + # Heavy fields excluded + assert "crew" not in result or result.get("crew") is None + assert "agents" not in result or result.get("agents") is None + assert "tasks" not in result or result.get("tasks") is None + assert "tools" not in result or result.get("tools") is None + + def test_crew_kickoff_completed_no_full_crew(self, listener): + """Verify crew_kickoff_completed doesn't repeat full crew structure.""" + + class CrewCompletedEvent: + def __init__(self): + self.crew_name = "TestCrew" + self.total_tokens = 5000 + self.output = "Final output" + # Should be excluded + self.crew = MagicMock() + self.crew.agents = [MagicMock(), MagicMock()] + self.crew.tasks = [MagicMock()] + + mock_event = CrewCompletedEvent() + + result = listener._build_event_data("crew_kickoff_completed", mock_event, None) + + # Scalar fields preserved + assert result.get("crew_name") == "TestCrew" + assert result.get("total_tokens") == 5000 + + # Should NOT have full crew object + assert "crew" not in result or result.get("crew") is None + # Should NOT have crew_structure (that's only for crew_started) + assert "crew_structure" not in result + + +class TestSizeReduction: + """Test that the optimization actually reduces serialized size.""" + + @pytest.fixture + def listener(self): + """Create a trace listener for testing.""" + TraceCollectionListener._instance = None + TraceCollectionListener._initialized = False + TraceCollectionListener._listeners_setup = False + return TraceCollectionListener() + + def test_task_event_size_reduction(self, listener): + """Verify task events are much smaller than naive serialization.""" + import json + + # Create a realistic task with many fields + mock_agent = MagicMock() + mock_agent.id = uuid.uuid4() + mock_agent.role = "Researcher" + mock_agent.goal = "Research" * 50 # Longer goal + mock_agent.backstory = "Expert" * 100 # Longer backstory + mock_agent.tools = [MagicMock() for _ in range(5)] + mock_agent.llm = MagicMock() + mock_agent.crew = MagicMock() + + mock_task = MagicMock() + mock_task.name = "Research Task" + mock_task.description = "Detailed description" * 20 + mock_task.expected_output = "Expected" * 10 + mock_task.id = uuid.uuid4() + mock_task.agent = mock_agent + mock_task.context = [MagicMock() for _ in range(3)] + mock_task.crew = MagicMock() + mock_task.tools = [MagicMock() for _ in range(3)] + + mock_event = MagicMock() + mock_event.task = mock_task + mock_event.context = "test context" + + mock_source = MagicMock() + mock_source.agent = mock_agent + + result = listener._build_event_data("task_started", mock_event, mock_source) + + # The result should be relatively small + serialized = json.dumps(result, default=str) + + # Should be under 2KB for task_started (was potentially 50-100KB before) + assert len(serialized) < 2000, f"task_started too large: {len(serialized)} bytes" + + # Should have the essential fields + assert "task_name" in result + assert "task_id" in result + assert "agent_role" in result