mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-03-31 16:18:14 +00:00
Compare commits
12 Commits
main
...
fix/trace-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8388169a56 | ||
|
|
5de23b867c | ||
|
|
8edd8b3355 | ||
|
|
2af6a531f5 | ||
|
|
c0d6d2b63f | ||
|
|
3e0c750f51 | ||
|
|
416f01fe23 | ||
|
|
da65ca2502 | ||
|
|
47f192e112 | ||
|
|
19d1088bab | ||
|
|
1faee0c684 | ||
|
|
6da1c5f964 |
@@ -2,14 +2,56 @@ from collections.abc import Iterator
|
||||
import contextvars
|
||||
from datetime import datetime, timezone
|
||||
import itertools
|
||||
from typing import Any
|
||||
from typing import Any, TypedDict
|
||||
import uuid
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import BaseModel, Field, SerializationInfo
|
||||
|
||||
from crewai.utilities.serialization import Serializable, to_serializable
|
||||
|
||||
|
||||
def _is_trace_context(info: SerializationInfo) -> bool:
|
||||
"""Check if serialization is happening in trace context."""
|
||||
return bool(info.context and info.context.get("trace"))
|
||||
|
||||
|
||||
class AgentRef(TypedDict):
|
||||
id: str
|
||||
role: str
|
||||
|
||||
|
||||
class TaskRef(TypedDict):
|
||||
id: str
|
||||
name: str
|
||||
|
||||
|
||||
def _trace_agent_ref(agent: Any) -> AgentRef | None:
|
||||
"""Return a lightweight agent reference for trace serialization."""
|
||||
if agent is None:
|
||||
return None
|
||||
return AgentRef(
|
||||
id=str(getattr(agent, "id", "")),
|
||||
role=getattr(agent, "role", ""),
|
||||
)
|
||||
|
||||
|
||||
def _trace_task_ref(task: Any) -> TaskRef | None:
|
||||
"""Return a lightweight task reference for trace serialization."""
|
||||
if task is None:
|
||||
return None
|
||||
return TaskRef(
|
||||
id=str(getattr(task, "id", "")),
|
||||
name=str(getattr(task, "name", None) or getattr(task, "description", "")),
|
||||
)
|
||||
|
||||
|
||||
def _trace_tool_names(tools: Any) -> list[str] | None:
|
||||
"""Return a list of tool names for trace serialization."""
|
||||
if not tools:
|
||||
return None
|
||||
return [getattr(t, "name", str(t)) for t in tools]
|
||||
|
||||
|
||||
_emission_counter: contextvars.ContextVar[Iterator[int]] = contextvars.ContextVar(
|
||||
"_emission_counter"
|
||||
)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"""Trace collection listener for orchestrating trace collection."""
|
||||
|
||||
import os
|
||||
from typing import Any, ClassVar
|
||||
from typing import Any
|
||||
import uuid
|
||||
|
||||
from typing_extensions import Self
|
||||
@@ -126,18 +126,13 @@ from crewai.events.types.tool_usage_events import (
|
||||
from crewai.events.utils.console_formatter import ConsoleFormatter
|
||||
|
||||
|
||||
_TRACE_CONTEXT: dict[str, bool] = {"trace": True}
|
||||
"""Serialization context that triggers lightweight field serializers on event models."""
|
||||
|
||||
|
||||
class TraceCollectionListener(BaseEventListener):
|
||||
"""Trace collection listener that orchestrates trace collection."""
|
||||
|
||||
complex_events: ClassVar[list[str]] = [
|
||||
"task_started",
|
||||
"task_completed",
|
||||
"llm_call_started",
|
||||
"llm_call_completed",
|
||||
"agent_execution_started",
|
||||
"agent_execution_completed",
|
||||
]
|
||||
|
||||
_instance: Self | None = None
|
||||
_initialized: bool = False
|
||||
_listeners_setup: bool = False
|
||||
@@ -810,9 +805,19 @@ class TraceCollectionListener(BaseEventListener):
|
||||
def _build_event_data(
|
||||
self, event_type: str, event: Any, source: Any
|
||||
) -> dict[str, Any]:
|
||||
"""Build event data"""
|
||||
if event_type not in self.complex_events:
|
||||
return safe_serialize_to_dict(event)
|
||||
"""Build event data with context-based serialization to reduce trace bloat.
|
||||
|
||||
Field serializers on event models check for context={"trace": True} and
|
||||
return lightweight references instead of full nested objects. This replaces
|
||||
the old denylist approach with Pydantic v2's native context mechanism.
|
||||
|
||||
Only crew_kickoff_started gets a full crew structure (built separately).
|
||||
Complex events (task_started, etc.) use custom projections for specific shapes.
|
||||
All other events get context-aware serialization automatically.
|
||||
"""
|
||||
if event_type == "crew_kickoff_started":
|
||||
return self._build_crew_started_data(event)
|
||||
|
||||
if event_type == "task_started":
|
||||
task_name = event.task.name or event.task.description
|
||||
task_display_name = (
|
||||
@@ -853,19 +858,77 @@ class TraceCollectionListener(BaseEventListener):
|
||||
"agent_backstory": event.agent.backstory,
|
||||
}
|
||||
if event_type == "llm_call_started":
|
||||
event_data = safe_serialize_to_dict(event)
|
||||
event_data = safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
|
||||
event_data["task_name"] = event.task_name or getattr(
|
||||
event, "task_description", None
|
||||
)
|
||||
return event_data
|
||||
if event_type == "llm_call_completed":
|
||||
return safe_serialize_to_dict(event)
|
||||
return safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
|
||||
|
||||
return {
|
||||
"event_type": event_type,
|
||||
"event": safe_serialize_to_dict(event),
|
||||
"source": source,
|
||||
}
|
||||
return safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
|
||||
|
||||
def _build_crew_started_data(self, event: Any) -> dict[str, Any]:
|
||||
"""Build comprehensive crew structure for crew_kickoff_started event.
|
||||
|
||||
This is the ONE place where we serialize the full crew structure.
|
||||
Subsequent events use lightweight references via field serializers.
|
||||
"""
|
||||
event_data = safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
|
||||
|
||||
crew = getattr(event, "crew", None)
|
||||
if crew is not None:
|
||||
agents_data = []
|
||||
for agent in getattr(crew, "agents", []) or []:
|
||||
agent_data = {
|
||||
"id": str(getattr(agent, "id", "")),
|
||||
"role": getattr(agent, "role", ""),
|
||||
"goal": getattr(agent, "goal", ""),
|
||||
"backstory": getattr(agent, "backstory", ""),
|
||||
"verbose": getattr(agent, "verbose", False),
|
||||
"allow_delegation": getattr(agent, "allow_delegation", False),
|
||||
"max_iter": getattr(agent, "max_iter", None),
|
||||
"max_rpm": getattr(agent, "max_rpm", None),
|
||||
}
|
||||
tools = getattr(agent, "tools", None)
|
||||
if tools:
|
||||
agent_data["tool_names"] = [
|
||||
getattr(t, "name", str(t)) for t in tools
|
||||
]
|
||||
agents_data.append(agent_data)
|
||||
|
||||
tasks_data = []
|
||||
for task in getattr(crew, "tasks", []) or []:
|
||||
task_data = {
|
||||
"id": str(getattr(task, "id", "")),
|
||||
"name": getattr(task, "name", None),
|
||||
"description": getattr(task, "description", ""),
|
||||
"expected_output": getattr(task, "expected_output", ""),
|
||||
"async_execution": getattr(task, "async_execution", False),
|
||||
"human_input": getattr(task, "human_input", False),
|
||||
}
|
||||
task_agent = getattr(task, "agent", None)
|
||||
if task_agent:
|
||||
task_data["agent_ref"] = {
|
||||
"id": str(getattr(task_agent, "id", "")),
|
||||
"role": getattr(task_agent, "role", ""),
|
||||
}
|
||||
context_tasks = getattr(task, "context", None)
|
||||
if context_tasks:
|
||||
task_data["context_task_ids"] = [
|
||||
str(getattr(ct, "id", "")) for ct in context_tasks
|
||||
]
|
||||
tasks_data.append(task_data)
|
||||
|
||||
event_data["crew_structure"] = {
|
||||
"agents": agents_data,
|
||||
"tasks": tasks_data,
|
||||
"process": str(getattr(crew, "process", "")),
|
||||
"verbose": getattr(crew, "verbose", False),
|
||||
"memory": getattr(crew, "memory", False),
|
||||
}
|
||||
|
||||
return event_data
|
||||
|
||||
def _show_tracing_disabled_message(self) -> None:
|
||||
"""Show a message when tracing is disabled."""
|
||||
|
||||
@@ -429,10 +429,22 @@ def mark_first_execution_done(user_consented: bool = False) -> None:
|
||||
p.write_text(json.dumps(data, indent=2))
|
||||
|
||||
|
||||
def safe_serialize_to_dict(obj: Any, exclude: set[str] | None = None) -> dict[str, Any]:
|
||||
"""Safely serialize an object to a dictionary for event data."""
|
||||
def safe_serialize_to_dict(
|
||||
obj: Any,
|
||||
exclude: set[str] | None = None,
|
||||
context: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Safely serialize an object to a dictionary for event data.
|
||||
|
||||
Args:
|
||||
obj: Object to serialize.
|
||||
exclude: Set of keys to exclude from the result.
|
||||
context: Optional context dict passed through to Pydantic's model_dump().
|
||||
Field serializers can inspect this to customize output
|
||||
(e.g. context={"trace": True} for lightweight trace serialization).
|
||||
"""
|
||||
try:
|
||||
serialized = to_serializable(obj, exclude)
|
||||
serialized = to_serializable(obj, exclude, context=context)
|
||||
if isinstance(serialized, dict):
|
||||
return serialized
|
||||
return {"serialized_data": serialized}
|
||||
|
||||
@@ -5,11 +5,17 @@ from __future__ import annotations
|
||||
from collections.abc import Sequence
|
||||
from typing import Any
|
||||
|
||||
from pydantic import ConfigDict, model_validator
|
||||
from pydantic import ConfigDict, SerializationInfo, field_serializer, model_validator
|
||||
from typing_extensions import Self
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.events.base_events import BaseEvent
|
||||
from crewai.events.base_events import (
|
||||
BaseEvent,
|
||||
_is_trace_context,
|
||||
_trace_agent_ref,
|
||||
_trace_task_ref,
|
||||
_trace_tool_names,
|
||||
)
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.tools.structured_tool import CrewStructuredTool
|
||||
|
||||
@@ -31,6 +37,21 @@ class AgentExecutionStartedEvent(BaseEvent):
|
||||
_set_agent_fingerprint(self, self.agent)
|
||||
return self
|
||||
|
||||
@field_serializer("agent")
|
||||
@classmethod
|
||||
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_agent_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
@field_serializer("task")
|
||||
@classmethod
|
||||
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_task_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
@field_serializer("tools")
|
||||
@classmethod
|
||||
def _serialize_tools(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_tool_names(v) if _is_trace_context(info) else v
|
||||
|
||||
|
||||
class AgentExecutionCompletedEvent(BaseEvent):
|
||||
"""Event emitted when an agent completes executing a task"""
|
||||
@@ -48,6 +69,16 @@ class AgentExecutionCompletedEvent(BaseEvent):
|
||||
_set_agent_fingerprint(self, self.agent)
|
||||
return self
|
||||
|
||||
@field_serializer("agent")
|
||||
@classmethod
|
||||
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_agent_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
@field_serializer("task")
|
||||
@classmethod
|
||||
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_task_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
|
||||
class AgentExecutionErrorEvent(BaseEvent):
|
||||
"""Event emitted when an agent encounters an error during execution"""
|
||||
@@ -65,6 +96,16 @@ class AgentExecutionErrorEvent(BaseEvent):
|
||||
_set_agent_fingerprint(self, self.agent)
|
||||
return self
|
||||
|
||||
@field_serializer("agent")
|
||||
@classmethod
|
||||
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_agent_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
@field_serializer("task")
|
||||
@classmethod
|
||||
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_task_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
|
||||
# New event classes for LiteAgent
|
||||
class LiteAgentExecutionStartedEvent(BaseEvent):
|
||||
@@ -77,6 +118,11 @@ class LiteAgentExecutionStartedEvent(BaseEvent):
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
@field_serializer("tools")
|
||||
@classmethod
|
||||
def _serialize_tools(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_tool_names(v) if _is_trace_context(info) else v
|
||||
|
||||
|
||||
class LiteAgentExecutionCompletedEvent(BaseEvent):
|
||||
"""Event emitted when a LiteAgent completes execution"""
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from crewai.events.base_events import BaseEvent
|
||||
from pydantic import SerializationInfo, field_serializer
|
||||
|
||||
from crewai.events.base_events import BaseEvent, _is_trace_context
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -26,6 +28,14 @@ class CrewBaseEvent(BaseEvent):
|
||||
if self.crew.fingerprint.metadata:
|
||||
self.fingerprint_metadata = self.crew.fingerprint.metadata
|
||||
|
||||
@field_serializer("crew")
|
||||
@classmethod
|
||||
def _serialize_crew(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
"""Exclude crew in trace context — crew_kickoff_started builds structure separately."""
|
||||
if _is_trace_context(info):
|
||||
return None
|
||||
return v
|
||||
|
||||
def to_json(self, exclude: set[str] | None = None) -> Any:
|
||||
if exclude is None:
|
||||
exclude = set()
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, SerializationInfo, field_serializer
|
||||
|
||||
from crewai.events.base_events import BaseEvent
|
||||
from crewai.events.base_events import BaseEvent, _is_trace_context
|
||||
|
||||
|
||||
class LLMEventBase(BaseEvent):
|
||||
@@ -49,6 +49,16 @@ class LLMCallStartedEvent(LLMEventBase):
|
||||
callbacks: list[Any] | None = None
|
||||
available_functions: dict[str, Any] | None = None
|
||||
|
||||
@field_serializer("callbacks")
|
||||
@classmethod
|
||||
def _serialize_callbacks(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return None if _is_trace_context(info) else v
|
||||
|
||||
@field_serializer("available_functions")
|
||||
@classmethod
|
||||
def _serialize_available_functions(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return None if _is_trace_context(info) else v
|
||||
|
||||
|
||||
class LLMCallCompletedEvent(LLMEventBase):
|
||||
"""Event emitted when a LLM call completes"""
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
from typing import Any
|
||||
|
||||
from crewai.events.base_events import BaseEvent
|
||||
from pydantic import SerializationInfo, field_serializer
|
||||
|
||||
from crewai.events.base_events import BaseEvent, _is_trace_context, _trace_task_ref
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
|
||||
@@ -24,6 +26,11 @@ class TaskStartedEvent(BaseEvent):
|
||||
super().__init__(**data)
|
||||
_set_task_fingerprint(self, self.task)
|
||||
|
||||
@field_serializer("task")
|
||||
@classmethod
|
||||
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_task_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
|
||||
class TaskCompletedEvent(BaseEvent):
|
||||
"""Event emitted when a task completes"""
|
||||
@@ -36,6 +43,11 @@ class TaskCompletedEvent(BaseEvent):
|
||||
super().__init__(**data)
|
||||
_set_task_fingerprint(self, self.task)
|
||||
|
||||
@field_serializer("task")
|
||||
@classmethod
|
||||
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_task_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
|
||||
class TaskFailedEvent(BaseEvent):
|
||||
"""Event emitted when a task fails"""
|
||||
@@ -48,6 +60,11 @@ class TaskFailedEvent(BaseEvent):
|
||||
super().__init__(**data)
|
||||
_set_task_fingerprint(self, self.task)
|
||||
|
||||
@field_serializer("task")
|
||||
@classmethod
|
||||
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_task_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
|
||||
class TaskEvaluationEvent(BaseEvent):
|
||||
"""Event emitted when a task evaluation is completed"""
|
||||
@@ -59,3 +76,8 @@ class TaskEvaluationEvent(BaseEvent):
|
||||
def __init__(self, **data: Any) -> None:
|
||||
super().__init__(**data)
|
||||
_set_task_fingerprint(self, self.task)
|
||||
|
||||
@field_serializer("task")
|
||||
@classmethod
|
||||
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_task_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
@@ -2,9 +2,9 @@ from collections.abc import Callable
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from pydantic import ConfigDict
|
||||
from pydantic import ConfigDict, SerializationInfo, field_serializer
|
||||
|
||||
from crewai.events.base_events import BaseEvent
|
||||
from crewai.events.base_events import BaseEvent, _is_trace_context, _trace_agent_ref
|
||||
|
||||
|
||||
class ToolUsageEvent(BaseEvent):
|
||||
@@ -26,6 +26,11 @@ class ToolUsageEvent(BaseEvent):
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
@field_serializer("agent")
|
||||
@classmethod
|
||||
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_agent_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
def __init__(self, **data: Any) -> None:
|
||||
if data.get("from_task"):
|
||||
task = data["from_task"]
|
||||
@@ -99,6 +104,11 @@ class ToolExecutionErrorEvent(BaseEvent):
|
||||
tool_class: Callable[..., Any]
|
||||
agent: Any | None = None
|
||||
|
||||
@field_serializer("agent")
|
||||
@classmethod
|
||||
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
|
||||
return _trace_agent_ref(v) if _is_trace_context(info) else v
|
||||
|
||||
def __init__(self, **data: Any) -> None:
|
||||
super().__init__(**data)
|
||||
# Set fingerprint data from the agent
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date, datetime
|
||||
from enum import Enum
|
||||
import json
|
||||
from typing import Any, TypeAlias
|
||||
import uuid
|
||||
@@ -20,6 +21,7 @@ def to_serializable(
|
||||
max_depth: int = 5,
|
||||
_current_depth: int = 0,
|
||||
_ancestors: set[int] | None = None,
|
||||
context: dict[str, Any] | None = None,
|
||||
) -> Serializable:
|
||||
"""Converts a Python object into a JSON-compatible representation.
|
||||
|
||||
@@ -33,6 +35,9 @@ def to_serializable(
|
||||
max_depth: Maximum recursion depth. Defaults to 5.
|
||||
_current_depth: Current recursion depth (for internal use).
|
||||
_ancestors: Set of ancestor object ids for cycle detection (for internal use).
|
||||
context: Optional context dict passed to Pydantic's model_dump(context=...).
|
||||
Field serializers on the model can inspect this to customize output
|
||||
(e.g. context={"trace": True} for lightweight trace serialization).
|
||||
|
||||
Returns:
|
||||
Serializable: A JSON-compatible structure.
|
||||
@@ -48,6 +53,15 @@ def to_serializable(
|
||||
|
||||
if isinstance(obj, (str, int, float, bool, type(None))):
|
||||
return obj
|
||||
if isinstance(obj, Enum):
|
||||
return to_serializable(
|
||||
obj.value,
|
||||
exclude=exclude,
|
||||
max_depth=max_depth,
|
||||
_current_depth=_current_depth,
|
||||
_ancestors=_ancestors,
|
||||
context=context,
|
||||
)
|
||||
if isinstance(obj, uuid.UUID):
|
||||
return str(obj)
|
||||
if isinstance(obj, (date, datetime)):
|
||||
@@ -66,6 +80,7 @@ def to_serializable(
|
||||
max_depth=max_depth,
|
||||
_current_depth=_current_depth + 1,
|
||||
_ancestors=new_ancestors,
|
||||
context=context,
|
||||
)
|
||||
for item in obj
|
||||
]
|
||||
@@ -77,17 +92,24 @@ def to_serializable(
|
||||
max_depth=max_depth,
|
||||
_current_depth=_current_depth + 1,
|
||||
_ancestors=new_ancestors,
|
||||
context=context,
|
||||
)
|
||||
for key, value in obj.items()
|
||||
if key not in exclude
|
||||
}
|
||||
if isinstance(obj, BaseModel):
|
||||
try:
|
||||
dump_kwargs: dict[str, Any] = {}
|
||||
if exclude:
|
||||
dump_kwargs["exclude"] = exclude
|
||||
if context is not None:
|
||||
dump_kwargs["context"] = context
|
||||
return to_serializable(
|
||||
obj=obj.model_dump(exclude=exclude),
|
||||
obj=obj.model_dump(**dump_kwargs),
|
||||
max_depth=max_depth,
|
||||
_current_depth=_current_depth + 1,
|
||||
_ancestors=new_ancestors,
|
||||
context=context,
|
||||
)
|
||||
except Exception:
|
||||
try:
|
||||
@@ -97,12 +119,30 @@ def to_serializable(
|
||||
max_depth=max_depth,
|
||||
_current_depth=_current_depth + 1,
|
||||
_ancestors=new_ancestors,
|
||||
context=context,
|
||||
)
|
||||
for k, v in obj.__dict__.items()
|
||||
if k not in (exclude or set())
|
||||
}
|
||||
except Exception:
|
||||
return repr(obj)
|
||||
if callable(obj):
|
||||
return repr(obj)
|
||||
if hasattr(obj, "__dict__"):
|
||||
try:
|
||||
return {
|
||||
_to_serializable_key(k): to_serializable(
|
||||
v,
|
||||
max_depth=max_depth,
|
||||
_current_depth=_current_depth + 1,
|
||||
_ancestors=new_ancestors,
|
||||
context=context,
|
||||
)
|
||||
for k, v in obj.__dict__.items()
|
||||
if not k.startswith("_")
|
||||
}
|
||||
except Exception:
|
||||
return repr(obj)
|
||||
return repr(obj)
|
||||
|
||||
|
||||
|
||||
612
lib/crewai/tests/tracing/test_trace_serialization.py
Normal file
612
lib/crewai/tests/tracing/test_trace_serialization.py
Normal file
@@ -0,0 +1,612 @@
|
||||
"""Tests for trace serialization optimization using Pydantic v2 context-based serialization.
|
||||
|
||||
These tests verify that trace events use @field_serializer with SerializationInfo.context
|
||||
to produce lightweight representations, reducing event sizes from 50-100KB to a few KB.
|
||||
"""
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from pydantic import ConfigDict
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.events.base_events import _trace_agent_ref, _trace_task_ref, _trace_tool_names
|
||||
from crewai.events.listeners.tracing.utils import safe_serialize_to_dict
|
||||
from crewai.utilities.serialization import to_serializable
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lightweight BaseAgent subclass for tests (avoids heavy dependencies)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _StubAgent(BaseAgent):
|
||||
"""Minimal BaseAgent subclass that satisfies validation without heavy deps."""
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
def execute_task(self, *a: Any, **kw: Any) -> str:
|
||||
return ""
|
||||
|
||||
def create_agent_executor(self, *a: Any, **kw: Any) -> None:
|
||||
pass
|
||||
|
||||
def _parse_tools(self, *a: Any, **kw: Any) -> list:
|
||||
return []
|
||||
|
||||
def get_delegation_tools(self, *a: Any, **kw: Any) -> list:
|
||||
return []
|
||||
|
||||
def get_output_converter(self, *a: Any, **kw: Any) -> Any:
|
||||
return None
|
||||
|
||||
def get_multimodal_tools(self, *a: Any, **kw: Any) -> list:
|
||||
return []
|
||||
|
||||
async def aexecute_task(self, *a: Any, **kw: Any) -> str:
|
||||
return ""
|
||||
|
||||
def get_mcp_tools(self, *a: Any, **kw: Any) -> list:
|
||||
return []
|
||||
|
||||
def get_platform_tools(self, *a: Any, **kw: Any) -> list:
|
||||
return []
|
||||
|
||||
|
||||
def _make_stub_agent(**overrides) -> _StubAgent:
|
||||
"""Create a minimal BaseAgent instance for testing."""
|
||||
defaults = {
|
||||
"role": "Researcher",
|
||||
"goal": "Research things",
|
||||
"backstory": "Expert researcher",
|
||||
"tools": [],
|
||||
}
|
||||
defaults.update(overrides)
|
||||
return _StubAgent(**defaults)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers to build realistic mock objects for event fields
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_mock_task(**overrides):
|
||||
task = MagicMock()
|
||||
task.id = overrides.get("id", uuid.uuid4())
|
||||
task.name = overrides.get("name", "Research Task")
|
||||
task.description = overrides.get("description", "Do research")
|
||||
task.expected_output = overrides.get("expected_output", "Research results")
|
||||
task.async_execution = overrides.get("async_execution", False)
|
||||
task.human_input = overrides.get("human_input", False)
|
||||
task.agent = overrides.get("agent", _make_stub_agent())
|
||||
task.context = overrides.get("context", None)
|
||||
task.crew = MagicMock()
|
||||
task.tools = overrides.get("tools", [MagicMock(), MagicMock()])
|
||||
|
||||
fp = MagicMock()
|
||||
fp.uuid_str = str(uuid.uuid4())
|
||||
fp.metadata = {"name": task.name}
|
||||
task.fingerprint = fp
|
||||
|
||||
return task
|
||||
|
||||
|
||||
def _make_stub_tool(tool_name="web_search") -> Any:
|
||||
"""Create a minimal BaseTool instance for testing."""
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
|
||||
class _StubTool(BaseTool):
|
||||
name: str = "stub"
|
||||
description: str = "stub tool"
|
||||
|
||||
def _run(self, *a: Any, **kw: Any) -> str:
|
||||
return ""
|
||||
|
||||
return _StubTool(name=tool_name, description=f"{tool_name} tool")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unit tests: trace ref helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestTraceRefHelpers:
|
||||
def test_trace_agent_ref(self):
|
||||
agent = _make_stub_agent(role="Analyst")
|
||||
ref = _trace_agent_ref(agent)
|
||||
assert ref["role"] == "Analyst"
|
||||
assert "id" in ref
|
||||
assert len(ref) == 2 # only id and role
|
||||
|
||||
def test_trace_agent_ref_none(self):
|
||||
assert _trace_agent_ref(None) is None
|
||||
|
||||
def test_trace_task_ref(self):
|
||||
task = _make_mock_task(name="Write Report")
|
||||
ref = _trace_task_ref(task)
|
||||
assert ref["name"] == "Write Report"
|
||||
assert "id" in ref
|
||||
assert len(ref) == 2
|
||||
|
||||
def test_trace_task_ref_falls_back_to_description(self):
|
||||
task = _make_mock_task(name=None, description="Describe the report")
|
||||
ref = _trace_task_ref(task)
|
||||
assert ref["name"] == "Describe the report"
|
||||
|
||||
def test_trace_task_ref_none(self):
|
||||
assert _trace_task_ref(None) is None
|
||||
|
||||
def test_trace_tool_names(self):
|
||||
tools = [_make_stub_tool("search"), _make_stub_tool("read")]
|
||||
names = _trace_tool_names(tools)
|
||||
assert names == ["search", "read"]
|
||||
|
||||
def test_trace_tool_names_empty(self):
|
||||
assert _trace_tool_names([]) is None
|
||||
assert _trace_tool_names(None) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration tests: field serializers on real event classes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestAgentEventFieldSerializers:
|
||||
"""Test that agent event field serializers respond to trace context."""
|
||||
|
||||
def test_agent_execution_started_trace_context(self):
|
||||
from crewai.events.types.agent_events import AgentExecutionStartedEvent
|
||||
|
||||
agent = _make_stub_agent(role="Researcher")
|
||||
task = _make_mock_task(name="Research Task")
|
||||
tools = [_make_stub_tool("search"), _make_stub_tool("read")]
|
||||
|
||||
event = AgentExecutionStartedEvent(
|
||||
agent=agent, task=task, tools=tools, task_prompt="Do research"
|
||||
)
|
||||
|
||||
# With trace context: lightweight refs
|
||||
trace_dump = event.model_dump(context={"trace": True})
|
||||
assert trace_dump["agent"] == {"id": str(agent.id), "role": "Researcher"}
|
||||
assert trace_dump["task"] == {"id": str(task.id), "name": "Research Task"}
|
||||
assert trace_dump["tools"] == ["search", "read"]
|
||||
|
||||
def test_agent_execution_started_no_context(self):
|
||||
from crewai.events.types.agent_events import AgentExecutionStartedEvent
|
||||
|
||||
agent = _make_stub_agent(role="SpecificRole")
|
||||
task = _make_mock_task()
|
||||
|
||||
event = AgentExecutionStartedEvent(
|
||||
agent=agent, task=task, tools=None, task_prompt="Do research"
|
||||
)
|
||||
|
||||
# Without context: full agent dict (Pydantic model_dump expands it)
|
||||
normal_dump = event.model_dump()
|
||||
assert isinstance(normal_dump["agent"], dict)
|
||||
assert normal_dump["agent"]["role"] == "SpecificRole"
|
||||
# Should have ALL agent fields, not just the lightweight ref
|
||||
assert "goal" in normal_dump["agent"]
|
||||
assert "backstory" in normal_dump["agent"]
|
||||
assert "max_iter" in normal_dump["agent"]
|
||||
|
||||
def test_agent_execution_error_preserves_identification(self):
|
||||
from crewai.events.types.agent_events import AgentExecutionErrorEvent
|
||||
|
||||
agent = _make_stub_agent(role="Analyst")
|
||||
task = _make_mock_task(name="Analysis Task")
|
||||
|
||||
event = AgentExecutionErrorEvent(
|
||||
agent=agent, task=task, error="Something went wrong"
|
||||
)
|
||||
|
||||
trace_dump = event.model_dump(context={"trace": True})
|
||||
# Error events should still have agent/task identification as refs
|
||||
assert trace_dump["agent"]["role"] == "Analyst"
|
||||
assert trace_dump["task"]["name"] == "Analysis Task"
|
||||
assert trace_dump["error"] == "Something went wrong"
|
||||
|
||||
def test_agent_execution_completed_trace_context(self):
|
||||
from crewai.events.types.agent_events import AgentExecutionCompletedEvent
|
||||
|
||||
agent = _make_stub_agent(role="Writer")
|
||||
task = _make_mock_task(name="Writing Task")
|
||||
|
||||
event = AgentExecutionCompletedEvent(
|
||||
agent=agent, task=task, output="Final output"
|
||||
)
|
||||
|
||||
trace_dump = event.model_dump(context={"trace": True})
|
||||
assert trace_dump["agent"]["role"] == "Writer"
|
||||
assert trace_dump["task"]["name"] == "Writing Task"
|
||||
assert trace_dump["output"] == "Final output"
|
||||
|
||||
|
||||
class TestTaskEventFieldSerializers:
|
||||
"""Test that task event field serializers respond to trace context."""
|
||||
|
||||
def test_task_started_trace_context(self):
|
||||
from crewai.events.types.task_events import TaskStartedEvent
|
||||
|
||||
task = _make_mock_task(name="Test Task")
|
||||
event = TaskStartedEvent(task=task, context="some context")
|
||||
|
||||
trace_dump = event.model_dump(context={"trace": True})
|
||||
assert trace_dump["task"] == {"id": str(task.id), "name": "Test Task"}
|
||||
assert trace_dump["context"] == "some context"
|
||||
|
||||
def test_task_failed_trace_context(self):
|
||||
from crewai.events.types.task_events import TaskFailedEvent
|
||||
|
||||
task = _make_mock_task(name="Failing Task")
|
||||
event = TaskFailedEvent(task=task, error="Task failed")
|
||||
|
||||
trace_dump = event.model_dump(context={"trace": True})
|
||||
assert trace_dump["task"]["name"] == "Failing Task"
|
||||
assert trace_dump["error"] == "Task failed"
|
||||
|
||||
|
||||
class TestCrewEventFieldSerializers:
|
||||
"""Test that crew event field serializers respond to trace context."""
|
||||
|
||||
def test_crew_kickoff_started_excludes_crew_in_trace(self):
|
||||
from crewai.events.types.crew_events import CrewKickoffStartedEvent
|
||||
|
||||
crew = MagicMock()
|
||||
crew.fingerprint = MagicMock()
|
||||
crew.fingerprint.uuid_str = str(uuid.uuid4())
|
||||
crew.fingerprint.metadata = {}
|
||||
|
||||
event = CrewKickoffStartedEvent(
|
||||
crew=crew, crew_name="TestCrew", inputs={"key": "value"}
|
||||
)
|
||||
|
||||
trace_dump = event.model_dump(context={"trace": True})
|
||||
# crew field should be None in trace context
|
||||
assert trace_dump["crew"] is None
|
||||
# scalar fields preserved
|
||||
assert trace_dump["crew_name"] == "TestCrew"
|
||||
assert trace_dump["inputs"] == {"key": "value"}
|
||||
|
||||
def test_crew_event_no_context_preserves_crew(self):
|
||||
from crewai.events.types.crew_events import CrewKickoffStartedEvent
|
||||
|
||||
crew = MagicMock()
|
||||
crew.fingerprint = MagicMock()
|
||||
crew.fingerprint.uuid_str = str(uuid.uuid4())
|
||||
crew.fingerprint.metadata = {}
|
||||
|
||||
event = CrewKickoffStartedEvent(
|
||||
crew=crew, crew_name="TestCrew", inputs=None
|
||||
)
|
||||
|
||||
normal_dump = event.model_dump()
|
||||
# Without trace context, crew should NOT be None (field serializer didn't fire)
|
||||
assert normal_dump["crew"] is not None
|
||||
|
||||
|
||||
class TestLLMEventFieldSerializers:
|
||||
"""Test that LLM event field serializers respond to trace context."""
|
||||
|
||||
def test_llm_call_started_excludes_callbacks_in_trace(self):
|
||||
from crewai.events.types.llm_events import LLMCallStartedEvent
|
||||
|
||||
event = LLMCallStartedEvent(
|
||||
call_id="test-call",
|
||||
messages=[{"role": "user", "content": "Hello"}],
|
||||
tools=[{"name": "search", "description": "Search tool"}],
|
||||
callbacks=[MagicMock(), MagicMock()],
|
||||
available_functions={"search": MagicMock()},
|
||||
)
|
||||
|
||||
trace_dump = event.model_dump(context={"trace": True})
|
||||
# callbacks and available_functions excluded
|
||||
assert trace_dump["callbacks"] is None
|
||||
assert trace_dump["available_functions"] is None
|
||||
# tools preserved (lightweight list of dicts)
|
||||
assert trace_dump["tools"] == [{"name": "search", "description": "Search tool"}]
|
||||
# messages preserved
|
||||
assert trace_dump["messages"] == [{"role": "user", "content": "Hello"}]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration tests: safe_serialize_to_dict with context
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSafeSerializeWithContext:
|
||||
"""Test that safe_serialize_to_dict properly passes context through."""
|
||||
|
||||
def test_context_flows_through_to_field_serializers(self):
|
||||
from crewai.events.types.agent_events import AgentExecutionErrorEvent
|
||||
|
||||
agent = _make_stub_agent(role="Worker")
|
||||
task = _make_mock_task(name="Work Task")
|
||||
|
||||
event = AgentExecutionErrorEvent(
|
||||
agent=agent, task=task, error="error msg"
|
||||
)
|
||||
|
||||
result = safe_serialize_to_dict(event, context={"trace": True})
|
||||
# Field serializers should have fired
|
||||
assert result["agent"] == {"id": str(agent.id), "role": "Worker"}
|
||||
assert result["task"] == {"id": str(task.id), "name": "Work Task"}
|
||||
assert result["error"] == "error msg"
|
||||
|
||||
def test_no_context_preserves_full_serialization(self):
|
||||
from crewai.events.types.task_events import TaskFailedEvent
|
||||
|
||||
task = _make_mock_task(name="Test")
|
||||
event = TaskFailedEvent(task=task, error="fail")
|
||||
|
||||
result = safe_serialize_to_dict(event)
|
||||
# Without context, task should not be a lightweight ref
|
||||
assert result.get("task") is not None
|
||||
# It should be the raw object (model_dump returns it as-is for Any fields)
|
||||
# to_serializable will then repr() or process it further
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration tests: TraceCollectionListener._build_event_data
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBuildEventData:
|
||||
@pytest.fixture
|
||||
def listener(self):
|
||||
from crewai.events.listeners.tracing.trace_listener import (
|
||||
TraceCollectionListener,
|
||||
)
|
||||
TraceCollectionListener._instance = None
|
||||
TraceCollectionListener._initialized = False
|
||||
TraceCollectionListener._listeners_setup = False
|
||||
return TraceCollectionListener()
|
||||
|
||||
def test_crew_kickoff_started_has_crew_structure(self, listener):
|
||||
agent = _make_stub_agent(role="Researcher")
|
||||
agent.tools = [_make_stub_tool("search"), _make_stub_tool("read")]
|
||||
|
||||
task = _make_mock_task(name="Research Task", agent=agent)
|
||||
task.context = None
|
||||
|
||||
crew = MagicMock()
|
||||
crew.agents = [agent]
|
||||
crew.tasks = [task]
|
||||
crew.process = "sequential"
|
||||
crew.verbose = True
|
||||
crew.memory = False
|
||||
crew.fingerprint = MagicMock()
|
||||
crew.fingerprint.uuid_str = str(uuid.uuid4())
|
||||
crew.fingerprint.metadata = {}
|
||||
|
||||
from crewai.events.types.crew_events import CrewKickoffStartedEvent
|
||||
event = CrewKickoffStartedEvent(
|
||||
crew=crew, crew_name="TestCrew", inputs={"key": "value"}
|
||||
)
|
||||
|
||||
result = listener._build_event_data("crew_kickoff_started", event, None)
|
||||
|
||||
assert "crew_structure" in result
|
||||
cs = result["crew_structure"]
|
||||
assert len(cs["agents"]) == 1
|
||||
assert cs["agents"][0]["role"] == "Researcher"
|
||||
assert cs["agents"][0]["tool_names"] == ["search", "read"]
|
||||
assert len(cs["tasks"]) == 1
|
||||
assert cs["tasks"][0]["name"] == "Research Task"
|
||||
assert "agent_ref" in cs["tasks"][0]
|
||||
assert cs["tasks"][0]["agent_ref"]["role"] == "Researcher"
|
||||
|
||||
def test_crew_kickoff_started_context_task_ids(self, listener):
|
||||
agent = _make_stub_agent()
|
||||
task1 = _make_mock_task(name="Task 1", agent=agent)
|
||||
task1.context = None
|
||||
task2 = _make_mock_task(name="Task 2", agent=agent)
|
||||
task2.context = [task1]
|
||||
|
||||
crew = MagicMock()
|
||||
crew.agents = [agent]
|
||||
crew.tasks = [task1, task2]
|
||||
crew.process = "sequential"
|
||||
crew.verbose = False
|
||||
crew.memory = False
|
||||
crew.fingerprint = MagicMock()
|
||||
crew.fingerprint.uuid_str = str(uuid.uuid4())
|
||||
crew.fingerprint.metadata = {}
|
||||
|
||||
from crewai.events.types.crew_events import CrewKickoffStartedEvent
|
||||
event = CrewKickoffStartedEvent(
|
||||
crew=crew, crew_name="TestCrew", inputs=None
|
||||
)
|
||||
|
||||
result = listener._build_event_data("crew_kickoff_started", event, None)
|
||||
task2_data = result["crew_structure"]["tasks"][1]
|
||||
assert "context_task_ids" in task2_data
|
||||
assert str(task1.id) in task2_data["context_task_ids"]
|
||||
|
||||
def test_generic_event_uses_trace_context(self, listener):
|
||||
"""Non-complex events should use context-based serialization."""
|
||||
from crewai.events.types.crew_events import CrewKickoffCompletedEvent
|
||||
|
||||
crew = MagicMock()
|
||||
crew.fingerprint = MagicMock()
|
||||
crew.fingerprint.uuid_str = str(uuid.uuid4())
|
||||
crew.fingerprint.metadata = {}
|
||||
|
||||
event = CrewKickoffCompletedEvent(
|
||||
crew=crew, crew_name="TestCrew", output="Final result", total_tokens=5000
|
||||
)
|
||||
|
||||
result = listener._build_event_data("crew_kickoff_completed", event, None)
|
||||
|
||||
# Scalar fields preserved
|
||||
assert result.get("crew_name") == "TestCrew"
|
||||
assert result.get("total_tokens") == 5000
|
||||
# crew excluded by field serializer
|
||||
assert result.get("crew") is None
|
||||
# No crew_structure (that's only for kickoff_started)
|
||||
assert "crew_structure" not in result
|
||||
|
||||
def test_task_started_custom_projection(self, listener):
|
||||
task = _make_mock_task(name="Test Task")
|
||||
from crewai.events.types.task_events import TaskStartedEvent
|
||||
event = TaskStartedEvent(task=task, context="test context")
|
||||
source = MagicMock()
|
||||
source.agent = _make_stub_agent(role="Worker")
|
||||
|
||||
result = listener._build_event_data("task_started", event, source)
|
||||
|
||||
assert result["task_name"] == "Test Task"
|
||||
assert result["agent_role"] == "Worker"
|
||||
assert result["task_id"] == str(task.id)
|
||||
assert result["context"] == "test context"
|
||||
|
||||
def test_llm_call_started_uses_trace_context(self, listener):
|
||||
from crewai.events.types.llm_events import LLMCallStartedEvent
|
||||
|
||||
event = LLMCallStartedEvent(
|
||||
call_id="test",
|
||||
messages=[{"role": "user", "content": "Hello"}],
|
||||
tools=[{"name": "search"}],
|
||||
callbacks=[MagicMock()],
|
||||
available_functions={"fn": MagicMock()},
|
||||
)
|
||||
|
||||
result = listener._build_event_data("llm_call_started", event, None)
|
||||
|
||||
# callbacks and available_functions excluded via field serializer
|
||||
assert result.get("callbacks") is None
|
||||
assert result.get("available_functions") is None
|
||||
# tools preserved (lightweight schemas)
|
||||
assert result.get("tools") == [{"name": "search"}]
|
||||
|
||||
def test_agent_execution_error_preserves_identification(self, listener):
|
||||
"""Error events should preserve agent/task identification via field serializers."""
|
||||
from crewai.events.types.agent_events import AgentExecutionErrorEvent
|
||||
|
||||
agent = _make_stub_agent(role="Analyst")
|
||||
task = _make_mock_task(name="Analysis")
|
||||
|
||||
event = AgentExecutionErrorEvent(
|
||||
agent=agent, task=task, error="Something broke"
|
||||
)
|
||||
|
||||
result = listener._build_event_data("agent_execution_error", event, None)
|
||||
|
||||
# Field serializers return lightweight refs, not None
|
||||
assert result["agent"] == {"id": str(agent.id), "role": "Analyst"}
|
||||
assert result["task"] == {"id": str(task.id), "name": "Analysis"}
|
||||
assert result["error"] == "Something broke"
|
||||
|
||||
def test_task_failed_preserves_identification(self, listener):
|
||||
from crewai.events.types.task_events import TaskFailedEvent
|
||||
|
||||
task = _make_mock_task(name="Failed Task")
|
||||
event = TaskFailedEvent(task=task, error="Task failed")
|
||||
|
||||
result = listener._build_event_data("task_failed", event, None)
|
||||
|
||||
assert result["task"] == {"id": str(task.id), "name": "Failed Task"}
|
||||
assert result["error"] == "Task failed"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Size reduction verification
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSizeReduction:
|
||||
@pytest.fixture
|
||||
def listener(self):
|
||||
from crewai.events.listeners.tracing.trace_listener import (
|
||||
TraceCollectionListener,
|
||||
)
|
||||
TraceCollectionListener._instance = None
|
||||
TraceCollectionListener._initialized = False
|
||||
TraceCollectionListener._listeners_setup = False
|
||||
return TraceCollectionListener()
|
||||
|
||||
def test_task_started_event_size(self, listener):
|
||||
"""task_started event data should be well under 2KB."""
|
||||
agent = _make_stub_agent(
|
||||
role="Researcher",
|
||||
goal="Research" * 50,
|
||||
backstory="Expert" * 100,
|
||||
)
|
||||
agent.tools = [_make_stub_tool(f"tool_{i}") for i in range(5)]
|
||||
|
||||
task = _make_mock_task(
|
||||
name="Research Task",
|
||||
description="Detailed description" * 20,
|
||||
expected_output="Expected" * 10,
|
||||
agent=agent,
|
||||
)
|
||||
task.context = [_make_mock_task() for _ in range(3)]
|
||||
task.tools = [_make_stub_tool(f"t_{i}") for i in range(3)]
|
||||
|
||||
from crewai.events.types.task_events import TaskStartedEvent
|
||||
event = TaskStartedEvent(task=task, context="test context")
|
||||
source = MagicMock()
|
||||
source.agent = agent
|
||||
|
||||
result = listener._build_event_data("task_started", event, source)
|
||||
serialized = json.dumps(result, default=str)
|
||||
|
||||
assert len(serialized) < 2000, f"task_started too large: {len(serialized)} bytes"
|
||||
assert "task_name" in result
|
||||
assert "agent_role" in result
|
||||
|
||||
def test_error_event_size(self, listener):
|
||||
"""Error events should be small despite having agent/task refs."""
|
||||
from crewai.events.types.agent_events import AgentExecutionErrorEvent
|
||||
|
||||
agent = _make_stub_agent(
|
||||
goal="Very long goal " * 100,
|
||||
backstory="Very long backstory " * 100,
|
||||
)
|
||||
task = _make_mock_task(description="Very long description " * 100)
|
||||
|
||||
event = AgentExecutionErrorEvent(
|
||||
agent=agent, task=task, error="error"
|
||||
)
|
||||
|
||||
result = listener._build_event_data("agent_execution_error", event, None)
|
||||
serialized = json.dumps(result, default=str)
|
||||
|
||||
# Should be small - agent/task are just {id, role/name} refs
|
||||
assert len(serialized) < 5000, f"error event too large: {len(serialized)} bytes"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# to_serializable context threading
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestToSerializableContext:
|
||||
"""Test that context parameter flows through to_serializable correctly."""
|
||||
|
||||
def test_context_passed_to_model_dump(self):
|
||||
from crewai.events.types.agent_events import AgentExecutionErrorEvent
|
||||
|
||||
agent = _make_stub_agent(role="Tester")
|
||||
task = _make_mock_task(name="Test Task")
|
||||
|
||||
event = AgentExecutionErrorEvent(
|
||||
agent=agent, task=task, error="test error"
|
||||
)
|
||||
|
||||
# Directly use to_serializable with context
|
||||
result = to_serializable(event, context={"trace": True})
|
||||
assert isinstance(result, dict)
|
||||
assert result["agent"] == {"id": str(agent.id), "role": "Tester"}
|
||||
assert result["task"] == {"id": str(task.id), "name": "Test Task"}
|
||||
|
||||
def test_no_context_does_not_trigger_serializers(self):
|
||||
from crewai.events.types.crew_events import CrewKickoffStartedEvent
|
||||
|
||||
crew = MagicMock()
|
||||
crew.fingerprint = MagicMock()
|
||||
crew.fingerprint.uuid_str = str(uuid.uuid4())
|
||||
crew.fingerprint.metadata = {}
|
||||
|
||||
event = CrewKickoffStartedEvent(
|
||||
crew=crew, crew_name="Test", inputs=None
|
||||
)
|
||||
|
||||
# Without context, crew should NOT be None
|
||||
result = event.model_dump()
|
||||
assert result["crew"] is not None
|
||||
Reference in New Issue
Block a user