fix: reduce trace event serialization bloat by excluding redundant nested objects

Each trace event was serializing the ENTIRE Crew/Task/Agent object graph into
event_data JSONB, causing 500GB+ trace tables in production. For a crew with
5 agents and 10 tasks, each event could be 50-100KB because:
- Crew serialized full tasks AND full agents (with all tools, LLM configs)
- Each Task re-serialized its agent (same Agent already in Crew.agents)
- Each Task re-serialized context tasks (same Tasks already in Crew.tasks)

This fix:
1. Adds TRACE_EXCLUDE_FIELDS constant listing back-references and heavy fields
   to exclude (crew, agent, agents, tasks, context, tools, llm, callbacks, etc.)

2. Adds _serialize_for_trace() helper that uses safe_serialize_to_dict with
   the exclusion set, keeping scalar fields (agent_role, task_name, etc.)
   that the AMP frontend actually reads

3. Updates _build_event_data() to use lightweight serialization for all
   events except crew_kickoff_started

4. Adds _build_crew_started_data() that serializes the full crew structure
   ONCE with:
   - Agents with tool_names (list of strings, not full tool objects)
   - Tasks with agent_ref (just {id, role}) instead of full agent
   - Tasks with context_task_ids (just IDs) instead of full context tasks

5. Updates to_serializable() in serialization.py to:
   - Handle callable objects (functions/lambdas) by falling through to repr()
   - Handle regular classes with __dict__ (not just Pydantic models)

Expected size reduction: 50-100KB per event down to ~1-2KB per event.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alex
2026-03-30 09:25:01 -07:00
parent ac14b9127e
commit a4f1164812
3 changed files with 718 additions and 10 deletions

View File

@@ -126,6 +126,54 @@ from crewai.events.types.tool_usage_events import (
from crewai.events.utils.console_formatter import ConsoleFormatter
# Fields to exclude from trace serialization to reduce redundant data.
# These back-references and heavy objects create massive bloat when serialized
# repeatedly across events (crew->agents->tasks->agent creates circular refs).
TRACE_EXCLUDE_FIELDS = {
# Back-references that create redundant/circular data
"crew",
"agent",
"agents",
"tasks",
"context",
# Heavy fields not needed in individual trace events
"tools",
"llm",
"function_calling_llm",
"step_callback",
"task_callback",
"crew_callback",
"callbacks",
"_memory",
"_cache",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"knowledge_sources",
}
def _serialize_for_trace(
event: Any, extra_exclude: set[str] | None = None
) -> dict[str, Any]:
"""Serialize an event for tracing, excluding redundant back-references.
Keeps all scalar fields (agent_role, task_name, etc.) that the AMP frontend uses.
Replaces heavy nested objects with lightweight ID references to reduce trace bloat.
Args:
event: The event object to serialize.
extra_exclude: Additional fields to exclude beyond TRACE_EXCLUDE_FIELDS.
Returns:
A dictionary with the serialized event data.
"""
exclude = TRACE_EXCLUDE_FIELDS.copy()
if extra_exclude:
exclude.update(extra_exclude)
return safe_serialize_to_dict(event, exclude=exclude)
class TraceCollectionListener(BaseEventListener):
"""Trace collection listener that orchestrates trace collection."""
@@ -810,9 +858,17 @@ class TraceCollectionListener(BaseEventListener):
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return safe_serialize_to_dict(event)
"""Build event data with optimized serialization to reduce trace bloat.
For most events, excludes heavy nested objects (crew, agents, tasks, tools)
that would create massive redundant data. Only crew_kickoff_started gets
the full crew structure as a one-time dump.
"""
# crew_kickoff_started is special: include full crew structure ONCE
if event_type == "crew_kickoff_started":
return self._build_crew_started_data(event)
# Complex events have custom handling that already extracts only needed fields
if event_type == "task_started":
task_name = event.task.name or event.task.description
task_display_name = (
@@ -853,19 +909,84 @@ class TraceCollectionListener(BaseEventListener):
"agent_backstory": event.agent.backstory,
}
if event_type == "llm_call_started":
event_data = safe_serialize_to_dict(event)
event_data = _serialize_for_trace(event)
event_data["task_name"] = event.task_name or getattr(
event, "task_description", None
)
return event_data
if event_type == "llm_call_completed":
return safe_serialize_to_dict(event)
return _serialize_for_trace(event)
return {
"event_type": event_type,
"event": safe_serialize_to_dict(event),
"source": source,
}
# For all other events, use lightweight serialization
return _serialize_for_trace(event)
def _build_crew_started_data(self, event: Any) -> dict[str, Any]:
"""Build comprehensive crew structure for crew_kickoff_started event.
This is the ONE place where we serialize the full crew structure.
Subsequent events use lightweight references to avoid redundancy.
"""
event_data = _serialize_for_trace(event)
# Add full crew structure with optimized agent/task serialization
crew = getattr(event, "crew", None)
if crew is not None:
# Serialize agents with tools (first occurrence only)
agents_data = []
for agent in getattr(crew, "agents", []) or []:
agent_data = {
"id": str(getattr(agent, "id", "")),
"role": getattr(agent, "role", ""),
"goal": getattr(agent, "goal", ""),
"backstory": getattr(agent, "backstory", ""),
"verbose": getattr(agent, "verbose", False),
"allow_delegation": getattr(agent, "allow_delegation", False),
"max_iter": getattr(agent, "max_iter", None),
"max_rpm": getattr(agent, "max_rpm", None),
}
# Include tool names (not full tool objects)
tools = getattr(agent, "tools", None)
if tools:
agent_data["tool_names"] = [
getattr(t, "name", str(t)) for t in tools
]
agents_data.append(agent_data)
# Serialize tasks with lightweight agent references
tasks_data = []
for task in getattr(crew, "tasks", []) or []:
task_data = {
"id": str(getattr(task, "id", "")),
"name": getattr(task, "name", None),
"description": getattr(task, "description", ""),
"expected_output": getattr(task, "expected_output", ""),
"async_execution": getattr(task, "async_execution", False),
"human_input": getattr(task, "human_input", False),
}
# Replace full agent with lightweight reference
task_agent = getattr(task, "agent", None)
if task_agent:
task_data["agent_ref"] = {
"id": str(getattr(task_agent, "id", "")),
"role": getattr(task_agent, "role", ""),
}
# Replace context tasks with lightweight references
context_tasks = getattr(task, "context", None)
if context_tasks:
task_data["context_task_ids"] = [
str(getattr(ct, "id", "")) for ct in context_tasks
]
tasks_data.append(task_data)
event_data["crew_structure"] = {
"agents": agents_data,
"tasks": tasks_data,
"process": str(getattr(crew, "process", "")),
"verbose": getattr(crew, "verbose", False),
"memory": getattr(crew, "memory", False),
}
return event_data
def _show_tracing_disabled_message(self) -> None:
"""Show a message when tracing is disabled."""

View File

@@ -103,6 +103,28 @@ def to_serializable(
}
except Exception:
return repr(obj)
# Callables (functions, methods, lambdas) should fall through to repr
if callable(obj):
return repr(obj)
# Handle regular classes with __dict__ (non-Pydantic)
if hasattr(obj, "__dict__"):
try:
return {
_to_serializable_key(k): to_serializable(
v,
exclude=exclude,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
)
for k, v in obj.__dict__.items()
if k not in exclude and not k.startswith("_")
}
except Exception:
return repr(obj)
return repr(obj)

View File

@@ -0,0 +1,565 @@
"""Tests for trace serialization optimization to prevent trace table bloat.
These tests verify that trace events don't contain redundant full crew/task/agent
objects, reducing event sizes from 50-100KB to a few KB per event.
"""
import uuid
from unittest.mock import MagicMock, Mock
import pytest
from crewai.events.listeners.tracing.trace_listener import (
TRACE_EXCLUDE_FIELDS,
TraceCollectionListener,
_serialize_for_trace,
)
from crewai.events.listeners.tracing.utils import safe_serialize_to_dict
class TestTraceExcludeFields:
"""Test that TRACE_EXCLUDE_FIELDS contains all the heavy/redundant fields."""
def test_contains_back_references(self):
"""Verify back-reference fields are excluded."""
back_refs = {"crew", "agent", "agents", "tasks", "context"}
assert back_refs.issubset(TRACE_EXCLUDE_FIELDS)
def test_contains_heavy_fields(self):
"""Verify heavy objects are excluded."""
heavy_fields = {
"tools",
"llm",
"function_calling_llm",
"step_callback",
"task_callback",
"crew_callback",
"callbacks",
"_memory",
"_cache",
"knowledge_sources",
}
assert heavy_fields.issubset(TRACE_EXCLUDE_FIELDS)
class TestSerializeForTrace:
"""Test the _serialize_for_trace helper function."""
def test_excludes_crew_field(self):
"""Verify crew field is excluded from serialization."""
event = MagicMock()
event.crew = MagicMock(name="TestCrew")
event.crew_name = "TestCrew"
event.timestamp = None
result = _serialize_for_trace(event)
# crew_name should be present (scalar field)
# crew should be excluded (back-reference)
assert "crew" not in result or result.get("crew") is None
def test_excludes_agent_field(self):
"""Verify agent field is excluded from serialization."""
event = MagicMock()
event.agent = MagicMock(role="TestAgent")
event.agent_role = "TestAgent"
result = _serialize_for_trace(event)
assert "agent" not in result or result.get("agent") is None
def test_excludes_tools_field(self):
"""Verify tools field is excluded from serialization."""
event = MagicMock()
event.tools = [MagicMock(), MagicMock()]
event.tool_name = "test_tool"
result = _serialize_for_trace(event)
assert "tools" not in result or result.get("tools") is None
def test_preserves_scalar_fields(self):
"""Verify scalar fields needed by AMP frontend are preserved."""
class SimpleEvent:
def __init__(self):
self.agent_role = "Researcher"
self.task_name = "Research Task"
self.task_id = str(uuid.uuid4())
self.duration_ms = 1500
self.tokens_used = 500
event = SimpleEvent()
result = _serialize_for_trace(event)
# Scalar fields should be preserved
assert result.get("agent_role") == "Researcher"
assert result.get("task_name") == "Research Task"
assert result.get("duration_ms") == 1500
assert result.get("tokens_used") == 500
def test_extra_exclude_parameter(self):
"""Verify extra_exclude adds to the default exclusions."""
class EventWithCustomField:
def __init__(self):
self.custom_heavy_field = {"large": "data" * 1000}
self.keep_this = "small"
event = EventWithCustomField()
result = _serialize_for_trace(event, extra_exclude={"custom_heavy_field"})
assert "custom_heavy_field" not in result
assert result.get("keep_this") == "small"
class TestBuildEventData:
"""Test _build_event_data method for different event types."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
# Reset singleton
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_task_started_no_full_task_object(self, listener):
"""Verify task_started event doesn't include full task object."""
mock_task = MagicMock()
mock_task.name = "Test Task"
mock_task.description = "A test task description"
mock_task.expected_output = "Expected result"
mock_task.id = uuid.uuid4()
# Add heavy fields that should NOT appear in output
mock_task.crew = MagicMock(name="HeavyCrew")
mock_task.agent = MagicMock(role="HeavyAgent")
mock_task.context = [MagicMock(), MagicMock()]
mock_task.tools = [MagicMock(), MagicMock()]
mock_event = MagicMock()
mock_event.task = mock_task
mock_event.context = "test context"
mock_source = MagicMock()
mock_source.agent = MagicMock()
mock_source.agent.role = "Worker"
result = listener._build_event_data("task_started", mock_event, mock_source)
# Should have scalar fields
assert result["task_name"] == "Test Task"
assert result["task_description"] == "A test task description"
assert result["agent_role"] == "Worker"
assert result["task_id"] == str(mock_task.id)
# Should NOT have full objects
assert "crew" not in result
assert "tools" not in result
# task and agent should not be full objects
assert result.get("task") is None or not hasattr(result.get("task"), "crew")
def test_task_completed_no_full_task_object(self, listener):
"""Verify task_completed event doesn't include full task object."""
mock_task = MagicMock()
mock_task.name = "Completed Task"
mock_task.description = "Task description"
mock_task.id = uuid.uuid4()
mock_output = MagicMock()
mock_output.raw = "Task result"
mock_output.output_format = "text"
mock_output.agent = "Worker"
mock_event = MagicMock()
mock_event.task = mock_task
mock_event.output = mock_output
result = listener._build_event_data("task_completed", mock_event, None)
# Should have scalar fields
assert result["task_name"] == "Completed Task"
assert result["output_raw"] == "Task result"
assert result["agent_role"] == "Worker"
# Should NOT have full task object
assert "crew" not in result
assert "tools" not in result
def test_agent_execution_started_no_full_agent(self, listener):
"""Verify agent_execution_started extracts only scalar fields."""
mock_agent = MagicMock()
mock_agent.role = "Analyst"
mock_agent.goal = "Analyze data"
mock_agent.backstory = "Expert analyst"
# Heavy fields
mock_agent.tools = [MagicMock(), MagicMock()]
mock_agent.llm = MagicMock()
mock_agent.crew = MagicMock()
mock_event = MagicMock()
mock_event.agent = mock_agent
result = listener._build_event_data(
"agent_execution_started", mock_event, None
)
# Should have scalar fields
assert result["agent_role"] == "Analyst"
assert result["agent_goal"] == "Analyze data"
assert result["agent_backstory"] == "Expert analyst"
# Should NOT have heavy objects
assert "tools" not in result
assert "llm" not in result
assert "crew" not in result
def test_llm_call_started_excludes_heavy_fields(self, listener):
"""Verify llm_call_started uses lightweight serialization."""
class MockLLMEvent:
def __init__(self):
self.task_name = "LLM Task"
self.model = "gpt-4"
self.tokens = 100
# Heavy fields that should be excluded
self.crew = MagicMock()
self.agent = MagicMock()
self.tools = [MagicMock()]
mock_event = MockLLMEvent()
result = listener._build_event_data("llm_call_started", mock_event, None)
# task_name should be present
assert result["task_name"] == "LLM Task"
# Heavy fields should be excluded
assert "crew" not in result or result.get("crew") is None
assert "agent" not in result or result.get("agent") is None
assert "tools" not in result or result.get("tools") is None
def test_llm_call_completed_excludes_heavy_fields(self, listener):
"""Verify llm_call_completed uses lightweight serialization."""
class MockLLMCompletedEvent:
def __init__(self):
self.response = "LLM response"
self.tokens_used = 150
self.duration_ms = 500
# Heavy fields
self.crew = MagicMock()
self.agent = MagicMock()
mock_event = MockLLMCompletedEvent()
result = listener._build_event_data("llm_call_completed", mock_event, None)
# Scalar fields preserved
assert result.get("response") == "LLM response"
assert result.get("tokens_used") == 150
# Heavy fields excluded
assert "crew" not in result or result.get("crew") is None
assert "agent" not in result or result.get("agent") is None
class TestCrewKickoffStartedEvent:
"""Test that crew_kickoff_started event has full structure."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_crew_started_has_crew_structure(self, listener):
"""Verify crew_kickoff_started includes the crew_structure field."""
# Create mock crew with agents and tasks
mock_agent1 = MagicMock()
mock_agent1.id = uuid.uuid4()
mock_agent1.role = "Researcher"
mock_agent1.goal = "Research things"
mock_agent1.backstory = "Expert researcher"
mock_agent1.verbose = True
mock_agent1.allow_delegation = False
mock_agent1.max_iter = 10
mock_agent1.max_rpm = None
mock_agent1.tools = [MagicMock(name="search_tool"), MagicMock(name="read_tool")]
mock_agent2 = MagicMock()
mock_agent2.id = uuid.uuid4()
mock_agent2.role = "Writer"
mock_agent2.goal = "Write content"
mock_agent2.backstory = "Expert writer"
mock_agent2.verbose = False
mock_agent2.allow_delegation = True
mock_agent2.max_iter = 5
mock_agent2.max_rpm = 10
mock_agent2.tools = []
mock_task1 = MagicMock()
mock_task1.id = uuid.uuid4()
mock_task1.name = "Research Task"
mock_task1.description = "Do research"
mock_task1.expected_output = "Research results"
mock_task1.async_execution = False
mock_task1.human_input = False
mock_task1.agent = mock_agent1
mock_task1.context = None
mock_task2 = MagicMock()
mock_task2.id = uuid.uuid4()
mock_task2.name = "Writing Task"
mock_task2.description = "Write report"
mock_task2.expected_output = "Written report"
mock_task2.async_execution = True
mock_task2.human_input = True
mock_task2.agent = mock_agent2
mock_task2.context = [mock_task1]
mock_crew = MagicMock()
mock_crew.agents = [mock_agent1, mock_agent2]
mock_crew.tasks = [mock_task1, mock_task2]
mock_crew.process = "sequential"
mock_crew.verbose = True
mock_crew.memory = False
mock_event = MagicMock()
mock_event.crew = mock_crew
mock_event.crew_name = "TestCrew"
mock_event.inputs = {"key": "value"}
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
# Should have crew_structure
assert "crew_structure" in result
crew_structure = result["crew_structure"]
# Verify agents are serialized with tool names
assert len(crew_structure["agents"]) == 2
agent1_data = crew_structure["agents"][0]
assert agent1_data["role"] == "Researcher"
assert agent1_data["goal"] == "Research things"
assert "tool_names" in agent1_data
assert len(agent1_data["tool_names"]) == 2
# Verify tasks have lightweight agent references
assert len(crew_structure["tasks"]) == 2
task2_data = crew_structure["tasks"][1]
assert task2_data["name"] == "Writing Task"
assert "agent_ref" in task2_data
assert task2_data["agent_ref"]["role"] == "Writer"
# Verify context uses task IDs
assert "context_task_ids" in task2_data
assert str(mock_task1.id) in task2_data["context_task_ids"]
def test_crew_started_agents_no_full_tools(self, listener):
"""Verify agents in crew_structure have tool_names, not full tool objects."""
mock_tool = MagicMock()
mock_tool.name = "web_search"
mock_tool.description = "Search the web"
mock_tool.func = lambda x: x # Heavy callable
mock_tool.args_schema = {"type": "object"} # Schema
mock_agent = MagicMock()
mock_agent.id = uuid.uuid4()
mock_agent.role = "Searcher"
mock_agent.goal = "Search"
mock_agent.backstory = "Expert"
mock_agent.verbose = False
mock_agent.allow_delegation = False
mock_agent.max_iter = 5
mock_agent.max_rpm = None
mock_agent.tools = [mock_tool]
mock_crew = MagicMock()
mock_crew.agents = [mock_agent]
mock_crew.tasks = []
mock_crew.process = "sequential"
mock_crew.verbose = False
mock_crew.memory = False
mock_event = MagicMock()
mock_event.crew = mock_crew
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
agent_data = result["crew_structure"]["agents"][0]
# Should have tool_names (list of strings)
assert "tool_names" in agent_data
assert agent_data["tool_names"] == ["web_search"]
# Should NOT have full tools array
assert "tools" not in agent_data
def test_crew_started_tasks_no_full_agent(self, listener):
"""Verify tasks have agent_ref, not full agent object."""
mock_agent = MagicMock()
mock_agent.id = uuid.uuid4()
mock_agent.role = "Worker"
mock_agent.goal = "Work hard"
mock_agent.backstory = "Dedicated worker"
mock_agent.tools = [MagicMock(), MagicMock()]
mock_agent.llm = MagicMock()
mock_task = MagicMock()
mock_task.id = uuid.uuid4()
mock_task.name = "Work Task"
mock_task.description = "Do work"
mock_task.expected_output = "Work done"
mock_task.async_execution = False
mock_task.human_input = False
mock_task.agent = mock_agent
mock_task.context = None
mock_crew = MagicMock()
mock_crew.agents = [mock_agent]
mock_crew.tasks = [mock_task]
mock_crew.process = "sequential"
mock_crew.verbose = False
mock_crew.memory = False
mock_event = MagicMock()
mock_event.crew = mock_crew
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
task_data = result["crew_structure"]["tasks"][0]
# Should have lightweight agent_ref
assert "agent_ref" in task_data
assert task_data["agent_ref"]["id"] == str(mock_agent.id)
assert task_data["agent_ref"]["role"] == "Worker"
# agent_ref should ONLY have id and role (not tools, llm, etc.)
assert len(task_data["agent_ref"]) == 2
# Should NOT have full agent
assert "agent" not in task_data
class TestNonCrewStartedEvents:
"""Test that non-crew_started events don't have redundant data."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_generic_event_no_crew(self, listener):
"""Verify generic events exclude crew object."""
class GenericEvent:
def __init__(self):
self.event_type = "some_event"
self.data = "some_data"
# These should be excluded
self.crew = MagicMock()
self.agents = [MagicMock()]
self.tasks = [MagicMock()]
self.tools = [MagicMock()]
mock_event = GenericEvent()
result = listener._build_event_data("some_event", mock_event, None)
# Scalar fields preserved
assert result.get("event_type") == "some_event"
assert result.get("data") == "some_data"
# Heavy fields excluded
assert "crew" not in result or result.get("crew") is None
assert "agents" not in result or result.get("agents") is None
assert "tasks" not in result or result.get("tasks") is None
assert "tools" not in result or result.get("tools") is None
def test_crew_kickoff_completed_no_full_crew(self, listener):
"""Verify crew_kickoff_completed doesn't repeat full crew structure."""
class CrewCompletedEvent:
def __init__(self):
self.crew_name = "TestCrew"
self.total_tokens = 5000
self.output = "Final output"
# Should be excluded
self.crew = MagicMock()
self.crew.agents = [MagicMock(), MagicMock()]
self.crew.tasks = [MagicMock()]
mock_event = CrewCompletedEvent()
result = listener._build_event_data("crew_kickoff_completed", mock_event, None)
# Scalar fields preserved
assert result.get("crew_name") == "TestCrew"
assert result.get("total_tokens") == 5000
# Should NOT have full crew object
assert "crew" not in result or result.get("crew") is None
# Should NOT have crew_structure (that's only for crew_started)
assert "crew_structure" not in result
class TestSizeReduction:
"""Test that the optimization actually reduces serialized size."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_task_event_size_reduction(self, listener):
"""Verify task events are much smaller than naive serialization."""
import json
# Create a realistic task with many fields
mock_agent = MagicMock()
mock_agent.id = uuid.uuid4()
mock_agent.role = "Researcher"
mock_agent.goal = "Research" * 50 # Longer goal
mock_agent.backstory = "Expert" * 100 # Longer backstory
mock_agent.tools = [MagicMock() for _ in range(5)]
mock_agent.llm = MagicMock()
mock_agent.crew = MagicMock()
mock_task = MagicMock()
mock_task.name = "Research Task"
mock_task.description = "Detailed description" * 20
mock_task.expected_output = "Expected" * 10
mock_task.id = uuid.uuid4()
mock_task.agent = mock_agent
mock_task.context = [MagicMock() for _ in range(3)]
mock_task.crew = MagicMock()
mock_task.tools = [MagicMock() for _ in range(3)]
mock_event = MagicMock()
mock_event.task = mock_task
mock_event.context = "test context"
mock_source = MagicMock()
mock_source.agent = mock_agent
result = listener._build_event_data("task_started", mock_event, mock_source)
# The result should be relatively small
serialized = json.dumps(result, default=str)
# Should be under 2KB for task_started (was potentially 50-100KB before)
assert len(serialized) < 2000, f"task_started too large: {len(serialized)} bytes"
# Should have the essential fields
assert "task_name" in result
assert "task_id" in result
assert "agent_role" in result