mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-03-31 16:18:14 +00:00
Compare commits
4 Commits
main
...
fix/trace-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fad083ffaa | ||
|
|
b753012fc8 | ||
|
|
b40098b28e | ||
|
|
a4f1164812 |
@@ -126,18 +126,59 @@ from crewai.events.types.tool_usage_events import (
|
||||
from crewai.events.utils.console_formatter import ConsoleFormatter
|
||||
|
||||
|
||||
# Fields to exclude from trace serialization to reduce redundant data.
|
||||
# These back-references and heavy objects create massive bloat when serialized
|
||||
# repeatedly across events (crew->agents->tasks->agent creates circular refs).
|
||||
TRACE_EXCLUDE_FIELDS = {
|
||||
# Back-references that create redundant/circular data
|
||||
"crew",
|
||||
"agent",
|
||||
"agents",
|
||||
"task",
|
||||
"tasks",
|
||||
"context",
|
||||
# Heavy fields not needed in individual trace events
|
||||
# NOTE: "tools" intentionally NOT here - LLMCallStartedEvent.tools is lightweight
|
||||
# (list of tool schemas). Agent.tools is excluded in _build_crew_started_data.
|
||||
"llm",
|
||||
"function_calling_llm",
|
||||
"step_callback",
|
||||
"task_callback",
|
||||
"crew_callback",
|
||||
"callbacks",
|
||||
"_memory",
|
||||
"_cache",
|
||||
"_rpm_controller",
|
||||
"_request_within_rpm_limit",
|
||||
"_token_process",
|
||||
"knowledge_sources",
|
||||
}
|
||||
|
||||
|
||||
def _serialize_for_trace(
|
||||
event: Any, extra_exclude: set[str] | None = None
|
||||
) -> dict[str, Any]:
|
||||
"""Serialize an event for tracing, excluding redundant back-references.
|
||||
|
||||
Keeps all scalar fields (agent_role, task_name, etc.) that the AMP frontend uses.
|
||||
Replaces heavy nested objects with lightweight ID references to reduce trace bloat.
|
||||
|
||||
Args:
|
||||
event: The event object to serialize.
|
||||
extra_exclude: Additional fields to exclude beyond TRACE_EXCLUDE_FIELDS.
|
||||
|
||||
Returns:
|
||||
A dictionary with the serialized event data.
|
||||
"""
|
||||
exclude = TRACE_EXCLUDE_FIELDS.copy()
|
||||
if extra_exclude:
|
||||
exclude.update(extra_exclude)
|
||||
return safe_serialize_to_dict(event, exclude=exclude)
|
||||
|
||||
|
||||
class TraceCollectionListener(BaseEventListener):
|
||||
"""Trace collection listener that orchestrates trace collection."""
|
||||
|
||||
complex_events: ClassVar[list[str]] = [
|
||||
"task_started",
|
||||
"task_completed",
|
||||
"llm_call_started",
|
||||
"llm_call_completed",
|
||||
"agent_execution_started",
|
||||
"agent_execution_completed",
|
||||
]
|
||||
|
||||
_instance: Self | None = None
|
||||
_initialized: bool = False
|
||||
_listeners_setup: bool = False
|
||||
@@ -810,9 +851,17 @@ class TraceCollectionListener(BaseEventListener):
|
||||
def _build_event_data(
|
||||
self, event_type: str, event: Any, source: Any
|
||||
) -> dict[str, Any]:
|
||||
"""Build event data"""
|
||||
if event_type not in self.complex_events:
|
||||
return safe_serialize_to_dict(event)
|
||||
"""Build event data with optimized serialization to reduce trace bloat.
|
||||
|
||||
For most events, excludes heavy nested objects (crew, agents, tasks, tools)
|
||||
that would create massive redundant data. Only crew_kickoff_started gets
|
||||
the full crew structure as a one-time dump.
|
||||
"""
|
||||
# crew_kickoff_started is special: include full crew structure ONCE
|
||||
if event_type == "crew_kickoff_started":
|
||||
return self._build_crew_started_data(event)
|
||||
|
||||
# Complex events have custom handling that already extracts only needed fields
|
||||
if event_type == "task_started":
|
||||
task_name = event.task.name or event.task.description
|
||||
task_display_name = (
|
||||
@@ -853,19 +902,101 @@ class TraceCollectionListener(BaseEventListener):
|
||||
"agent_backstory": event.agent.backstory,
|
||||
}
|
||||
if event_type == "llm_call_started":
|
||||
event_data = safe_serialize_to_dict(event)
|
||||
event_data = _serialize_for_trace(event)
|
||||
event_data["task_name"] = event.task_name or getattr(
|
||||
event, "task_description", None
|
||||
)
|
||||
return event_data
|
||||
if event_type == "llm_call_completed":
|
||||
return safe_serialize_to_dict(event)
|
||||
return _serialize_for_trace(event)
|
||||
|
||||
return {
|
||||
"event_type": event_type,
|
||||
"event": safe_serialize_to_dict(event),
|
||||
"source": source,
|
||||
}
|
||||
# Error events need agent/task identification extracted before generic
|
||||
# serialization strips them (agent/task are in TRACE_EXCLUDE_FIELDS)
|
||||
if event_type == "agent_execution_error":
|
||||
event_data = _serialize_for_trace(event)
|
||||
if event.agent:
|
||||
event_data["agent_role"] = getattr(event.agent, "role", None)
|
||||
event_data["agent_id"] = str(getattr(event.agent, "id", ""))
|
||||
return event_data
|
||||
if event_type == "task_failed":
|
||||
event_data = _serialize_for_trace(event)
|
||||
if event.task:
|
||||
event_data["task_name"] = getattr(event.task, "name", None) or getattr(
|
||||
event.task, "description", None
|
||||
)
|
||||
event_data["task_id"] = str(getattr(event.task, "id", ""))
|
||||
return event_data
|
||||
|
||||
# For all other events, use lightweight serialization
|
||||
return _serialize_for_trace(event)
|
||||
|
||||
def _build_crew_started_data(self, event: Any) -> dict[str, Any]:
|
||||
"""Build comprehensive crew structure for crew_kickoff_started event.
|
||||
|
||||
This is the ONE place where we serialize the full crew structure.
|
||||
Subsequent events use lightweight references to avoid redundancy.
|
||||
"""
|
||||
event_data = _serialize_for_trace(event)
|
||||
|
||||
# Add full crew structure with optimized agent/task serialization
|
||||
crew = getattr(event, "crew", None)
|
||||
if crew is not None:
|
||||
# Serialize agents with tools (first occurrence only)
|
||||
agents_data = []
|
||||
for agent in getattr(crew, "agents", []) or []:
|
||||
agent_data = {
|
||||
"id": str(getattr(agent, "id", "")),
|
||||
"role": getattr(agent, "role", ""),
|
||||
"goal": getattr(agent, "goal", ""),
|
||||
"backstory": getattr(agent, "backstory", ""),
|
||||
"verbose": getattr(agent, "verbose", False),
|
||||
"allow_delegation": getattr(agent, "allow_delegation", False),
|
||||
"max_iter": getattr(agent, "max_iter", None),
|
||||
"max_rpm": getattr(agent, "max_rpm", None),
|
||||
}
|
||||
# Include tool names (not full tool objects)
|
||||
tools = getattr(agent, "tools", None)
|
||||
if tools:
|
||||
agent_data["tool_names"] = [
|
||||
getattr(t, "name", str(t)) for t in tools
|
||||
]
|
||||
agents_data.append(agent_data)
|
||||
|
||||
# Serialize tasks with lightweight agent references
|
||||
tasks_data = []
|
||||
for task in getattr(crew, "tasks", []) or []:
|
||||
task_data = {
|
||||
"id": str(getattr(task, "id", "")),
|
||||
"name": getattr(task, "name", None),
|
||||
"description": getattr(task, "description", ""),
|
||||
"expected_output": getattr(task, "expected_output", ""),
|
||||
"async_execution": getattr(task, "async_execution", False),
|
||||
"human_input": getattr(task, "human_input", False),
|
||||
}
|
||||
# Replace full agent with lightweight reference
|
||||
task_agent = getattr(task, "agent", None)
|
||||
if task_agent:
|
||||
task_data["agent_ref"] = {
|
||||
"id": str(getattr(task_agent, "id", "")),
|
||||
"role": getattr(task_agent, "role", ""),
|
||||
}
|
||||
# Replace context tasks with lightweight references
|
||||
context_tasks = getattr(task, "context", None)
|
||||
if context_tasks:
|
||||
task_data["context_task_ids"] = [
|
||||
str(getattr(ct, "id", "")) for ct in context_tasks
|
||||
]
|
||||
tasks_data.append(task_data)
|
||||
|
||||
event_data["crew_structure"] = {
|
||||
"agents": agents_data,
|
||||
"tasks": tasks_data,
|
||||
"process": str(getattr(crew, "process", "")),
|
||||
"verbose": getattr(crew, "verbose", False),
|
||||
"memory": getattr(crew, "memory", False),
|
||||
}
|
||||
|
||||
return event_data
|
||||
|
||||
def _show_tracing_disabled_message(self) -> None:
|
||||
"""Show a message when tracing is disabled."""
|
||||
|
||||
@@ -103,6 +103,28 @@ def to_serializable(
|
||||
}
|
||||
except Exception:
|
||||
return repr(obj)
|
||||
|
||||
# Callables (functions, methods, lambdas) should fall through to repr
|
||||
if callable(obj):
|
||||
return repr(obj)
|
||||
|
||||
# Handle regular classes with __dict__ (non-Pydantic)
|
||||
# Note: Don't propagate exclude to recursive calls, matching Pydantic fallback behavior
|
||||
if hasattr(obj, "__dict__"):
|
||||
try:
|
||||
return {
|
||||
_to_serializable_key(k): to_serializable(
|
||||
v,
|
||||
max_depth=max_depth,
|
||||
_current_depth=_current_depth + 1,
|
||||
_ancestors=new_ancestors,
|
||||
)
|
||||
for k, v in obj.__dict__.items()
|
||||
if k not in exclude and not k.startswith("_")
|
||||
}
|
||||
except Exception:
|
||||
return repr(obj)
|
||||
|
||||
return repr(obj)
|
||||
|
||||
|
||||
|
||||
586
lib/crewai/tests/tracing/test_trace_serialization.py
Normal file
586
lib/crewai/tests/tracing/test_trace_serialization.py
Normal file
@@ -0,0 +1,586 @@
|
||||
"""Tests for trace serialization optimization to prevent trace table bloat.
|
||||
|
||||
These tests verify that trace events don't contain redundant full crew/task/agent
|
||||
objects, reducing event sizes from 50-100KB to a few KB per event.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.events.listeners.tracing.trace_listener import (
|
||||
TRACE_EXCLUDE_FIELDS,
|
||||
TraceCollectionListener,
|
||||
_serialize_for_trace,
|
||||
)
|
||||
|
||||
|
||||
class TestTraceExcludeFields:
|
||||
"""Test that TRACE_EXCLUDE_FIELDS contains all the heavy/redundant fields."""
|
||||
|
||||
def test_contains_back_references(self):
|
||||
"""Verify back-reference fields are excluded."""
|
||||
back_refs = {"crew", "agent", "agents", "tasks", "context"}
|
||||
assert back_refs.issubset(TRACE_EXCLUDE_FIELDS)
|
||||
|
||||
def test_contains_heavy_fields(self):
|
||||
"""Verify heavy objects are excluded.
|
||||
|
||||
Note: 'tools' is NOT in TRACE_EXCLUDE_FIELDS because LLMCallStartedEvent.tools
|
||||
is a lightweight list of tool schemas. Agent.tools exclusion is handled
|
||||
explicitly in _build_crew_started_data.
|
||||
"""
|
||||
heavy_fields = {
|
||||
"llm",
|
||||
"function_calling_llm",
|
||||
"step_callback",
|
||||
"task_callback",
|
||||
"crew_callback",
|
||||
"callbacks",
|
||||
"_memory",
|
||||
"_cache",
|
||||
"knowledge_sources",
|
||||
}
|
||||
assert heavy_fields.issubset(TRACE_EXCLUDE_FIELDS)
|
||||
# tools is NOT excluded globally - LLM events need it
|
||||
assert "tools" not in TRACE_EXCLUDE_FIELDS
|
||||
|
||||
|
||||
class TestSerializeForTrace:
|
||||
"""Test the _serialize_for_trace helper function."""
|
||||
|
||||
def test_excludes_crew_field(self):
|
||||
"""Verify crew field is excluded from serialization."""
|
||||
event = MagicMock()
|
||||
event.crew = MagicMock(name="TestCrew")
|
||||
event.crew_name = "TestCrew"
|
||||
event.timestamp = None
|
||||
|
||||
result = _serialize_for_trace(event)
|
||||
|
||||
# crew_name should be present (scalar field)
|
||||
# crew should be excluded (back-reference)
|
||||
assert "crew" not in result or result.get("crew") is None
|
||||
|
||||
def test_excludes_agent_field(self):
|
||||
"""Verify agent field is excluded from serialization."""
|
||||
event = MagicMock()
|
||||
event.agent = MagicMock(role="TestAgent")
|
||||
event.agent_role = "TestAgent"
|
||||
|
||||
result = _serialize_for_trace(event)
|
||||
|
||||
assert "agent" not in result or result.get("agent") is None
|
||||
|
||||
def test_preserves_tools_field(self):
|
||||
"""Verify tools field is preserved for LLM events (lightweight schemas)."""
|
||||
|
||||
class EventWithTools:
|
||||
def __init__(self):
|
||||
self.tools = [{"name": "search", "description": "Search tool"}]
|
||||
self.tool_name = "test_tool"
|
||||
|
||||
event = EventWithTools()
|
||||
result = _serialize_for_trace(event)
|
||||
|
||||
# tools should be preserved (lightweight for LLM events)
|
||||
assert "tools" in result
|
||||
assert result["tools"] == [{"name": "search", "description": "Search tool"}]
|
||||
|
||||
def test_preserves_scalar_fields(self):
|
||||
"""Verify scalar fields needed by AMP frontend are preserved."""
|
||||
|
||||
class SimpleEvent:
|
||||
def __init__(self):
|
||||
self.agent_role = "Researcher"
|
||||
self.task_name = "Research Task"
|
||||
self.task_id = str(uuid.uuid4())
|
||||
self.duration_ms = 1500
|
||||
self.tokens_used = 500
|
||||
|
||||
event = SimpleEvent()
|
||||
result = _serialize_for_trace(event)
|
||||
|
||||
# Scalar fields should be preserved
|
||||
assert result.get("agent_role") == "Researcher"
|
||||
assert result.get("task_name") == "Research Task"
|
||||
assert result.get("duration_ms") == 1500
|
||||
assert result.get("tokens_used") == 500
|
||||
|
||||
def test_extra_exclude_parameter(self):
|
||||
"""Verify extra_exclude adds to the default exclusions."""
|
||||
|
||||
class EventWithCustomField:
|
||||
def __init__(self):
|
||||
self.custom_heavy_field = {"large": "data" * 1000}
|
||||
self.keep_this = "small"
|
||||
|
||||
event = EventWithCustomField()
|
||||
result = _serialize_for_trace(event, extra_exclude={"custom_heavy_field"})
|
||||
|
||||
assert "custom_heavy_field" not in result
|
||||
assert result.get("keep_this") == "small"
|
||||
|
||||
|
||||
class TestBuildEventData:
|
||||
"""Test _build_event_data method for different event types."""
|
||||
|
||||
@pytest.fixture
|
||||
def listener(self):
|
||||
"""Create a trace listener for testing."""
|
||||
# Reset singleton
|
||||
TraceCollectionListener._instance = None
|
||||
TraceCollectionListener._initialized = False
|
||||
TraceCollectionListener._listeners_setup = False
|
||||
return TraceCollectionListener()
|
||||
|
||||
def test_task_started_no_full_task_object(self, listener):
|
||||
"""Verify task_started event doesn't include full task object."""
|
||||
mock_task = MagicMock()
|
||||
mock_task.name = "Test Task"
|
||||
mock_task.description = "A test task description"
|
||||
mock_task.expected_output = "Expected result"
|
||||
mock_task.id = uuid.uuid4()
|
||||
# Add heavy fields that should NOT appear in output
|
||||
mock_task.crew = MagicMock(name="HeavyCrew")
|
||||
mock_task.agent = MagicMock(role="HeavyAgent")
|
||||
mock_task.context = [MagicMock(), MagicMock()]
|
||||
mock_task.tools = [MagicMock(), MagicMock()]
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.task = mock_task
|
||||
mock_event.context = "test context"
|
||||
|
||||
mock_source = MagicMock()
|
||||
mock_source.agent = MagicMock()
|
||||
mock_source.agent.role = "Worker"
|
||||
|
||||
result = listener._build_event_data("task_started", mock_event, mock_source)
|
||||
|
||||
# Should have scalar fields
|
||||
assert result["task_name"] == "Test Task"
|
||||
assert result["task_description"] == "A test task description"
|
||||
assert result["agent_role"] == "Worker"
|
||||
assert result["task_id"] == str(mock_task.id)
|
||||
|
||||
# Should NOT have full objects
|
||||
assert "crew" not in result
|
||||
assert "tools" not in result
|
||||
# task and agent should not be full objects
|
||||
assert result.get("task") is None or not hasattr(result.get("task"), "crew")
|
||||
|
||||
def test_task_completed_no_full_task_object(self, listener):
|
||||
"""Verify task_completed event doesn't include full task object."""
|
||||
mock_task = MagicMock()
|
||||
mock_task.name = "Completed Task"
|
||||
mock_task.description = "Task description"
|
||||
mock_task.id = uuid.uuid4()
|
||||
|
||||
mock_output = MagicMock()
|
||||
mock_output.raw = "Task result"
|
||||
mock_output.output_format = "text"
|
||||
mock_output.agent = "Worker"
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.task = mock_task
|
||||
mock_event.output = mock_output
|
||||
|
||||
result = listener._build_event_data("task_completed", mock_event, None)
|
||||
|
||||
# Should have scalar fields
|
||||
assert result["task_name"] == "Completed Task"
|
||||
assert result["output_raw"] == "Task result"
|
||||
assert result["agent_role"] == "Worker"
|
||||
|
||||
# Should NOT have full task object
|
||||
assert "crew" not in result
|
||||
assert "tools" not in result
|
||||
|
||||
def test_agent_execution_started_no_full_agent(self, listener):
|
||||
"""Verify agent_execution_started extracts only scalar fields."""
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.role = "Analyst"
|
||||
mock_agent.goal = "Analyze data"
|
||||
mock_agent.backstory = "Expert analyst"
|
||||
# Heavy fields
|
||||
mock_agent.tools = [MagicMock(), MagicMock()]
|
||||
mock_agent.llm = MagicMock()
|
||||
mock_agent.crew = MagicMock()
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.agent = mock_agent
|
||||
|
||||
result = listener._build_event_data(
|
||||
"agent_execution_started", mock_event, None
|
||||
)
|
||||
|
||||
# Should have scalar fields
|
||||
assert result["agent_role"] == "Analyst"
|
||||
assert result["agent_goal"] == "Analyze data"
|
||||
assert result["agent_backstory"] == "Expert analyst"
|
||||
|
||||
# Should NOT have heavy objects
|
||||
assert "tools" not in result
|
||||
assert "llm" not in result
|
||||
assert "crew" not in result
|
||||
|
||||
def test_llm_call_started_excludes_heavy_fields(self, listener):
|
||||
"""Verify llm_call_started uses lightweight serialization.
|
||||
|
||||
LLMCallStartedEvent.tools is a lightweight list of tool schemas (dicts),
|
||||
not heavy Agent.tools objects, so it should be preserved.
|
||||
"""
|
||||
|
||||
class MockLLMEvent:
|
||||
def __init__(self):
|
||||
self.task_name = "LLM Task"
|
||||
self.model = "gpt-4"
|
||||
self.tokens = 100
|
||||
# Heavy fields that should be excluded
|
||||
self.crew = MagicMock()
|
||||
self.agent = MagicMock()
|
||||
# LLM event tools are lightweight schemas (dicts), should be kept
|
||||
self.tools = [{"name": "search", "description": "Search tool"}]
|
||||
|
||||
mock_event = MockLLMEvent()
|
||||
|
||||
result = listener._build_event_data("llm_call_started", mock_event, None)
|
||||
|
||||
# task_name should be present
|
||||
assert result["task_name"] == "LLM Task"
|
||||
|
||||
# Heavy fields should be excluded
|
||||
assert "crew" not in result or result.get("crew") is None
|
||||
assert "agent" not in result or result.get("agent") is None
|
||||
# LLM tools (lightweight schemas) should be preserved
|
||||
assert result.get("tools") == [{"name": "search", "description": "Search tool"}]
|
||||
|
||||
def test_llm_call_completed_excludes_heavy_fields(self, listener):
|
||||
"""Verify llm_call_completed uses lightweight serialization."""
|
||||
|
||||
class MockLLMCompletedEvent:
|
||||
def __init__(self):
|
||||
self.response = "LLM response"
|
||||
self.tokens_used = 150
|
||||
self.duration_ms = 500
|
||||
# Heavy fields
|
||||
self.crew = MagicMock()
|
||||
self.agent = MagicMock()
|
||||
|
||||
mock_event = MockLLMCompletedEvent()
|
||||
|
||||
result = listener._build_event_data("llm_call_completed", mock_event, None)
|
||||
|
||||
# Scalar fields preserved
|
||||
assert result.get("response") == "LLM response"
|
||||
assert result.get("tokens_used") == 150
|
||||
|
||||
# Heavy fields excluded
|
||||
assert "crew" not in result or result.get("crew") is None
|
||||
assert "agent" not in result or result.get("agent") is None
|
||||
|
||||
|
||||
class TestCrewKickoffStartedEvent:
|
||||
"""Test that crew_kickoff_started event has full structure."""
|
||||
|
||||
@pytest.fixture
|
||||
def listener(self):
|
||||
"""Create a trace listener for testing."""
|
||||
TraceCollectionListener._instance = None
|
||||
TraceCollectionListener._initialized = False
|
||||
TraceCollectionListener._listeners_setup = False
|
||||
return TraceCollectionListener()
|
||||
|
||||
def test_crew_started_has_crew_structure(self, listener):
|
||||
"""Verify crew_kickoff_started includes the crew_structure field."""
|
||||
# Create mock crew with agents and tasks
|
||||
mock_agent1 = MagicMock()
|
||||
mock_agent1.id = uuid.uuid4()
|
||||
mock_agent1.role = "Researcher"
|
||||
mock_agent1.goal = "Research things"
|
||||
mock_agent1.backstory = "Expert researcher"
|
||||
mock_agent1.verbose = True
|
||||
mock_agent1.allow_delegation = False
|
||||
mock_agent1.max_iter = 10
|
||||
mock_agent1.max_rpm = None
|
||||
mock_agent1.tools = [MagicMock(name="search_tool"), MagicMock(name="read_tool")]
|
||||
|
||||
mock_agent2 = MagicMock()
|
||||
mock_agent2.id = uuid.uuid4()
|
||||
mock_agent2.role = "Writer"
|
||||
mock_agent2.goal = "Write content"
|
||||
mock_agent2.backstory = "Expert writer"
|
||||
mock_agent2.verbose = False
|
||||
mock_agent2.allow_delegation = True
|
||||
mock_agent2.max_iter = 5
|
||||
mock_agent2.max_rpm = 10
|
||||
mock_agent2.tools = []
|
||||
|
||||
mock_task1 = MagicMock()
|
||||
mock_task1.id = uuid.uuid4()
|
||||
mock_task1.name = "Research Task"
|
||||
mock_task1.description = "Do research"
|
||||
mock_task1.expected_output = "Research results"
|
||||
mock_task1.async_execution = False
|
||||
mock_task1.human_input = False
|
||||
mock_task1.agent = mock_agent1
|
||||
mock_task1.context = None
|
||||
|
||||
mock_task2 = MagicMock()
|
||||
mock_task2.id = uuid.uuid4()
|
||||
mock_task2.name = "Writing Task"
|
||||
mock_task2.description = "Write report"
|
||||
mock_task2.expected_output = "Written report"
|
||||
mock_task2.async_execution = True
|
||||
mock_task2.human_input = True
|
||||
mock_task2.agent = mock_agent2
|
||||
mock_task2.context = [mock_task1]
|
||||
|
||||
mock_crew = MagicMock()
|
||||
mock_crew.agents = [mock_agent1, mock_agent2]
|
||||
mock_crew.tasks = [mock_task1, mock_task2]
|
||||
mock_crew.process = "sequential"
|
||||
mock_crew.verbose = True
|
||||
mock_crew.memory = False
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.crew = mock_crew
|
||||
mock_event.crew_name = "TestCrew"
|
||||
mock_event.inputs = {"key": "value"}
|
||||
|
||||
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
|
||||
|
||||
# Should have crew_structure
|
||||
assert "crew_structure" in result
|
||||
crew_structure = result["crew_structure"]
|
||||
|
||||
# Verify agents are serialized with tool names
|
||||
assert len(crew_structure["agents"]) == 2
|
||||
agent1_data = crew_structure["agents"][0]
|
||||
assert agent1_data["role"] == "Researcher"
|
||||
assert agent1_data["goal"] == "Research things"
|
||||
assert "tool_names" in agent1_data
|
||||
assert len(agent1_data["tool_names"]) == 2
|
||||
|
||||
# Verify tasks have lightweight agent references
|
||||
assert len(crew_structure["tasks"]) == 2
|
||||
task2_data = crew_structure["tasks"][1]
|
||||
assert task2_data["name"] == "Writing Task"
|
||||
assert "agent_ref" in task2_data
|
||||
assert task2_data["agent_ref"]["role"] == "Writer"
|
||||
|
||||
# Verify context uses task IDs
|
||||
assert "context_task_ids" in task2_data
|
||||
assert str(mock_task1.id) in task2_data["context_task_ids"]
|
||||
|
||||
def test_crew_started_agents_no_full_tools(self, listener):
|
||||
"""Verify agents in crew_structure have tool_names, not full tool objects."""
|
||||
mock_tool = MagicMock()
|
||||
mock_tool.name = "web_search"
|
||||
mock_tool.description = "Search the web"
|
||||
mock_tool.func = lambda x: x # Heavy callable
|
||||
mock_tool.args_schema = {"type": "object"} # Schema
|
||||
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.id = uuid.uuid4()
|
||||
mock_agent.role = "Searcher"
|
||||
mock_agent.goal = "Search"
|
||||
mock_agent.backstory = "Expert"
|
||||
mock_agent.verbose = False
|
||||
mock_agent.allow_delegation = False
|
||||
mock_agent.max_iter = 5
|
||||
mock_agent.max_rpm = None
|
||||
mock_agent.tools = [mock_tool]
|
||||
|
||||
mock_crew = MagicMock()
|
||||
mock_crew.agents = [mock_agent]
|
||||
mock_crew.tasks = []
|
||||
mock_crew.process = "sequential"
|
||||
mock_crew.verbose = False
|
||||
mock_crew.memory = False
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.crew = mock_crew
|
||||
|
||||
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
|
||||
|
||||
agent_data = result["crew_structure"]["agents"][0]
|
||||
|
||||
# Should have tool_names (list of strings)
|
||||
assert "tool_names" in agent_data
|
||||
assert agent_data["tool_names"] == ["web_search"]
|
||||
|
||||
# Should NOT have full tools array
|
||||
assert "tools" not in agent_data
|
||||
|
||||
def test_crew_started_tasks_no_full_agent(self, listener):
|
||||
"""Verify tasks have agent_ref, not full agent object."""
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.id = uuid.uuid4()
|
||||
mock_agent.role = "Worker"
|
||||
mock_agent.goal = "Work hard"
|
||||
mock_agent.backstory = "Dedicated worker"
|
||||
mock_agent.tools = [MagicMock(), MagicMock()]
|
||||
mock_agent.llm = MagicMock()
|
||||
|
||||
mock_task = MagicMock()
|
||||
mock_task.id = uuid.uuid4()
|
||||
mock_task.name = "Work Task"
|
||||
mock_task.description = "Do work"
|
||||
mock_task.expected_output = "Work done"
|
||||
mock_task.async_execution = False
|
||||
mock_task.human_input = False
|
||||
mock_task.agent = mock_agent
|
||||
mock_task.context = None
|
||||
|
||||
mock_crew = MagicMock()
|
||||
mock_crew.agents = [mock_agent]
|
||||
mock_crew.tasks = [mock_task]
|
||||
mock_crew.process = "sequential"
|
||||
mock_crew.verbose = False
|
||||
mock_crew.memory = False
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.crew = mock_crew
|
||||
|
||||
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
|
||||
|
||||
task_data = result["crew_structure"]["tasks"][0]
|
||||
|
||||
# Should have lightweight agent_ref
|
||||
assert "agent_ref" in task_data
|
||||
assert task_data["agent_ref"]["id"] == str(mock_agent.id)
|
||||
assert task_data["agent_ref"]["role"] == "Worker"
|
||||
|
||||
# agent_ref should ONLY have id and role (not tools, llm, etc.)
|
||||
assert len(task_data["agent_ref"]) == 2
|
||||
|
||||
# Should NOT have full agent
|
||||
assert "agent" not in task_data
|
||||
|
||||
|
||||
class TestNonCrewStartedEvents:
|
||||
"""Test that non-crew_started events don't have redundant data."""
|
||||
|
||||
@pytest.fixture
|
||||
def listener(self):
|
||||
"""Create a trace listener for testing."""
|
||||
TraceCollectionListener._instance = None
|
||||
TraceCollectionListener._initialized = False
|
||||
TraceCollectionListener._listeners_setup = False
|
||||
return TraceCollectionListener()
|
||||
|
||||
def test_generic_event_no_crew(self, listener):
|
||||
"""Verify generic events exclude crew object.
|
||||
|
||||
Note: 'tools' is now preserved since LLMCallStartedEvent.tools is lightweight.
|
||||
"""
|
||||
|
||||
class GenericEvent:
|
||||
def __init__(self):
|
||||
self.event_type = "some_event"
|
||||
self.data = "some_data"
|
||||
# These should be excluded
|
||||
self.crew = MagicMock()
|
||||
self.agents = [MagicMock()]
|
||||
self.tasks = [MagicMock()]
|
||||
# tools is now preserved (for LLM events it's lightweight)
|
||||
self.tools = [{"name": "search"}]
|
||||
|
||||
mock_event = GenericEvent()
|
||||
|
||||
result = listener._build_event_data("some_event", mock_event, None)
|
||||
|
||||
# Scalar fields preserved
|
||||
assert result.get("event_type") == "some_event"
|
||||
assert result.get("data") == "some_data"
|
||||
|
||||
# Heavy fields excluded
|
||||
assert "crew" not in result or result.get("crew") is None
|
||||
assert "agents" not in result or result.get("agents") is None
|
||||
assert "tasks" not in result or result.get("tasks") is None
|
||||
# tools is now preserved (lightweight for LLM events)
|
||||
assert result.get("tools") == [{"name": "search"}]
|
||||
|
||||
def test_crew_kickoff_completed_no_full_crew(self, listener):
|
||||
"""Verify crew_kickoff_completed doesn't repeat full crew structure."""
|
||||
|
||||
class CrewCompletedEvent:
|
||||
def __init__(self):
|
||||
self.crew_name = "TestCrew"
|
||||
self.total_tokens = 5000
|
||||
self.output = "Final output"
|
||||
# Should be excluded
|
||||
self.crew = MagicMock()
|
||||
self.crew.agents = [MagicMock(), MagicMock()]
|
||||
self.crew.tasks = [MagicMock()]
|
||||
|
||||
mock_event = CrewCompletedEvent()
|
||||
|
||||
result = listener._build_event_data("crew_kickoff_completed", mock_event, None)
|
||||
|
||||
# Scalar fields preserved
|
||||
assert result.get("crew_name") == "TestCrew"
|
||||
assert result.get("total_tokens") == 5000
|
||||
|
||||
# Should NOT have full crew object
|
||||
assert "crew" not in result or result.get("crew") is None
|
||||
# Should NOT have crew_structure (that's only for crew_started)
|
||||
assert "crew_structure" not in result
|
||||
|
||||
|
||||
class TestSizeReduction:
|
||||
"""Test that the optimization actually reduces serialized size."""
|
||||
|
||||
@pytest.fixture
|
||||
def listener(self):
|
||||
"""Create a trace listener for testing."""
|
||||
TraceCollectionListener._instance = None
|
||||
TraceCollectionListener._initialized = False
|
||||
TraceCollectionListener._listeners_setup = False
|
||||
return TraceCollectionListener()
|
||||
|
||||
def test_task_event_size_reduction(self, listener):
|
||||
"""Verify task events are much smaller than naive serialization."""
|
||||
import json
|
||||
|
||||
# Create a realistic task with many fields
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.id = uuid.uuid4()
|
||||
mock_agent.role = "Researcher"
|
||||
mock_agent.goal = "Research" * 50 # Longer goal
|
||||
mock_agent.backstory = "Expert" * 100 # Longer backstory
|
||||
mock_agent.tools = [MagicMock() for _ in range(5)]
|
||||
mock_agent.llm = MagicMock()
|
||||
mock_agent.crew = MagicMock()
|
||||
|
||||
mock_task = MagicMock()
|
||||
mock_task.name = "Research Task"
|
||||
mock_task.description = "Detailed description" * 20
|
||||
mock_task.expected_output = "Expected" * 10
|
||||
mock_task.id = uuid.uuid4()
|
||||
mock_task.agent = mock_agent
|
||||
mock_task.context = [MagicMock() for _ in range(3)]
|
||||
mock_task.crew = MagicMock()
|
||||
mock_task.tools = [MagicMock() for _ in range(3)]
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.task = mock_task
|
||||
mock_event.context = "test context"
|
||||
|
||||
mock_source = MagicMock()
|
||||
mock_source.agent = mock_agent
|
||||
|
||||
result = listener._build_event_data("task_started", mock_event, mock_source)
|
||||
|
||||
# The result should be relatively small
|
||||
serialized = json.dumps(result, default=str)
|
||||
|
||||
# Should be under 2KB for task_started (was potentially 50-100KB before)
|
||||
assert len(serialized) < 2000, f"task_started too large: {len(serialized)} bytes"
|
||||
|
||||
# Should have the essential fields
|
||||
assert "task_name" in result
|
||||
assert "task_id" in result
|
||||
assert "agent_role" in result
|
||||
Reference in New Issue
Block a user