Compare commits

..

14 Commits

Author SHA1 Message Date
Greyson LaLonde
2eec12b828 Merge branch 'main' into fix/trace-serialization-pydantic-context 2026-04-15 04:12:06 +08:00
Greyson LaLonde
0cd27790fd Merge branch 'main' into fix/trace-serialization-pydantic-context 2026-04-01 00:25:22 +08:00
Greyson LaLonde
8388169a56 Merge branch 'main' into fix/trace-serialization-pydantic-context 2026-03-31 23:45:34 +08:00
Greyson LaLonde
5de23b867c chore: remove unnecessary comments in _build_event_data 2026-03-31 16:41:57 +08:00
Greyson LaLonde
8edd8b3355 Merge branch 'fix/trace-serialization-pydantic-context' of https://github.com/crewAIInc/crewAI into fix/trace-serialization-pydantic-context 2026-03-31 16:36:00 +08:00
Greyson LaLonde
2af6a531f5 fix: serialize Enum via .value and add trace serializer for tool usage events 2026-03-31 16:34:59 +08:00
Greyson LaLonde
c0d6d2b63f chore: removed additional unused import in test
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-03-31 16:33:41 +08:00
Greyson LaLonde
3e0c750f51 Merge branch 'fix/trace-serialization-pydantic-context' of https://github.com/crewAIInc/crewAI into fix/trace-serialization-pydantic-context 2026-03-31 16:26:43 +08:00
Greyson LaLonde
416f01fe23 fix: add trace field serializer for agent on tool usage events 2026-03-31 16:25:14 +08:00
Greyson LaLonde
da65ca2502 chore: remove unused import in test
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-03-31 16:23:21 +08:00
Greyson LaLonde
47f192e112 fix: handle plain classes and callables in to_serializable
Add __dict__ handler for non-Pydantic classes so their attributes are
serialized rather than falling through to repr(). Guard with a callable
check so functions/lambdas still get repr().
2026-03-31 15:59:43 +08:00
Greyson LaLonde
19d1088bab chore: remove redundant callable check in to_serializable 2026-03-31 15:49:37 +08:00
Greyson LaLonde
1faee0c684 refactor: type trace ref helpers as TypedDict 2026-03-31 15:45:34 +08:00
Greyson LaLonde
6da1c5f964 fix: reduce trace event serialization bloat
Use context-aware field serializers so event models control their own
trace representation. Heavy nested objects become lightweight refs.
2026-03-31 15:40:25 +08:00
25 changed files with 922 additions and 572 deletions

View File

@@ -4,22 +4,6 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="15 أبريل 2026">
## v1.14.2a5
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.2a5)
## ما الذي تغير
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.2a4
## المساهمون
@greysonlalonde
</Update>
<Update label="15 أبريل 2026">
## v1.14.2a4

View File

@@ -4,22 +4,6 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Apr 15, 2026">
## v1.14.2a5
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.2a5)
## What's Changed
### Documentation
- Update changelog and version for v1.14.2a4
## Contributors
@greysonlalonde
</Update>
<Update label="Apr 15, 2026">
## v1.14.2a4

View File

@@ -4,22 +4,6 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 4월 15일">
## v1.14.2a5
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.2a5)
## 변경 사항
### 문서
- v1.14.2a4의 변경 로그 및 버전 업데이트
## 기여자
@greysonlalonde
</Update>
<Update label="2026년 4월 15일">
## v1.14.2a4

View File

@@ -4,22 +4,6 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="15 abr 2026">
## v1.14.2a5
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.2a5)
## O que Mudou
### Documentação
- Atualizar changelog e versão para v1.14.2a4
## Contribuidores
@greysonlalonde
</Update>
<Update label="15 abr 2026">
## v1.14.2a4

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.2a5"
__version__ = "1.14.2a4"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.2a5",
"crewai==1.14.2a4",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -305,4 +305,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.2a5"
__version__ = "1.14.2a4"

View File

@@ -55,7 +55,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.2a5",
"crewai-tools==1.14.2a4",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -46,7 +46,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.2a5"
__version__ = "1.14.2a4"
_telemetry_submitted = False

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.2a5"
"crewai[tools]==1.14.2a4"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.2a5"
"crewai[tools]==1.14.2a4"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.2a5"
"crewai[tools]==1.14.2a4"
]
[tool.crewai]

View File

@@ -2,14 +2,56 @@ from collections.abc import Iterator
import contextvars
from datetime import datetime, timezone
import itertools
from typing import Any
from typing import Any, TypedDict
import uuid
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, SerializationInfo
from crewai.utilities.serialization import Serializable, to_serializable
def _is_trace_context(info: SerializationInfo) -> bool:
"""Check if serialization is happening in trace context."""
return bool(info.context and info.context.get("trace"))
class AgentRef(TypedDict):
id: str
role: str
class TaskRef(TypedDict):
id: str
name: str
def _trace_agent_ref(agent: Any) -> AgentRef | None:
"""Return a lightweight agent reference for trace serialization."""
if agent is None:
return None
return AgentRef(
id=str(getattr(agent, "id", "")),
role=getattr(agent, "role", ""),
)
def _trace_task_ref(task: Any) -> TaskRef | None:
"""Return a lightweight task reference for trace serialization."""
if task is None:
return None
return TaskRef(
id=str(getattr(task, "id", "")),
name=str(getattr(task, "name", None) or getattr(task, "description", "")),
)
def _trace_tool_names(tools: Any) -> list[str] | None:
"""Return a list of tool names for trace serialization."""
if not tools:
return None
return [getattr(t, "name", str(t)) for t in tools]
_emission_counter: contextvars.ContextVar[Iterator[int]] = contextvars.ContextVar(
"_emission_counter"
)

View File

@@ -1,7 +1,7 @@
"""Trace collection listener for orchestrating trace collection."""
import os
from typing import Any, ClassVar
from typing import Any
import uuid
from typing_extensions import Self
@@ -129,18 +129,13 @@ from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.version import get_crewai_version
_TRACE_CONTEXT: dict[str, bool] = {"trace": True}
"""Serialization context that triggers lightweight field serializers on event models."""
class TraceCollectionListener(BaseEventListener):
"""Trace collection listener that orchestrates trace collection."""
complex_events: ClassVar[list[str]] = [
"task_started",
"task_completed",
"llm_call_started",
"llm_call_completed",
"agent_execution_started",
"agent_execution_completed",
]
_instance: Self | None = None
_initialized: bool = False
_listeners_setup: bool = False
@@ -824,9 +819,19 @@ class TraceCollectionListener(BaseEventListener):
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return safe_serialize_to_dict(event)
"""Build event data with context-based serialization to reduce trace bloat.
Field serializers on event models check for context={"trace": True} and
return lightweight references instead of full nested objects. This replaces
the old denylist approach with Pydantic v2's native context mechanism.
Only crew_kickoff_started gets a full crew structure (built separately).
Complex events (task_started, etc.) use custom projections for specific shapes.
All other events get context-aware serialization automatically.
"""
if event_type == "crew_kickoff_started":
return self._build_crew_started_data(event)
if event_type == "task_started":
task_name = event.task.name or event.task.description
task_display_name = (
@@ -867,19 +872,77 @@ class TraceCollectionListener(BaseEventListener):
"agent_backstory": event.agent.backstory,
}
if event_type == "llm_call_started":
event_data = safe_serialize_to_dict(event)
event_data = safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
event_data["task_name"] = event.task_name or getattr(
event, "task_description", None
)
return event_data
if event_type == "llm_call_completed":
return safe_serialize_to_dict(event)
return safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
return {
"event_type": event_type,
"event": safe_serialize_to_dict(event),
"source": source,
}
return safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
def _build_crew_started_data(self, event: Any) -> dict[str, Any]:
"""Build comprehensive crew structure for crew_kickoff_started event.
This is the ONE place where we serialize the full crew structure.
Subsequent events use lightweight references via field serializers.
"""
event_data = safe_serialize_to_dict(event, context=_TRACE_CONTEXT)
crew = getattr(event, "crew", None)
if crew is not None:
agents_data = []
for agent in getattr(crew, "agents", []) or []:
agent_data = {
"id": str(getattr(agent, "id", "")),
"role": getattr(agent, "role", ""),
"goal": getattr(agent, "goal", ""),
"backstory": getattr(agent, "backstory", ""),
"verbose": getattr(agent, "verbose", False),
"allow_delegation": getattr(agent, "allow_delegation", False),
"max_iter": getattr(agent, "max_iter", None),
"max_rpm": getattr(agent, "max_rpm", None),
}
tools = getattr(agent, "tools", None)
if tools:
agent_data["tool_names"] = [
getattr(t, "name", str(t)) for t in tools
]
agents_data.append(agent_data)
tasks_data = []
for task in getattr(crew, "tasks", []) or []:
task_data = {
"id": str(getattr(task, "id", "")),
"name": getattr(task, "name", None),
"description": getattr(task, "description", ""),
"expected_output": getattr(task, "expected_output", ""),
"async_execution": getattr(task, "async_execution", False),
"human_input": getattr(task, "human_input", False),
}
task_agent = getattr(task, "agent", None)
if task_agent:
task_data["agent_ref"] = {
"id": str(getattr(task_agent, "id", "")),
"role": getattr(task_agent, "role", ""),
}
context_tasks = getattr(task, "context", None)
if context_tasks:
task_data["context_task_ids"] = [
str(getattr(ct, "id", "")) for ct in context_tasks
]
tasks_data.append(task_data)
event_data["crew_structure"] = {
"agents": agents_data,
"tasks": tasks_data,
"process": str(getattr(crew, "process", "")),
"verbose": getattr(crew, "verbose", False),
"memory": getattr(crew, "memory", False),
}
return event_data
def _show_tracing_disabled_message(self) -> None:
"""Show a message when tracing is disabled."""

View File

@@ -429,10 +429,22 @@ def mark_first_execution_done(user_consented: bool = False) -> None:
p.write_text(json.dumps(data, indent=2))
def safe_serialize_to_dict(obj: Any, exclude: set[str] | None = None) -> dict[str, Any]:
"""Safely serialize an object to a dictionary for event data."""
def safe_serialize_to_dict(
obj: Any,
exclude: set[str] | None = None,
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Safely serialize an object to a dictionary for event data.
Args:
obj: Object to serialize.
exclude: Set of keys to exclude from the result.
context: Optional context dict passed through to Pydantic's model_dump().
Field serializers can inspect this to customize output
(e.g. context={"trace": True} for lightweight trace serialization).
"""
try:
serialized = to_serializable(obj, exclude)
serialized = to_serializable(obj, exclude, context=context)
if isinstance(serialized, dict):
return serialized
return {"serialized_data": serialized}

View File

@@ -5,11 +5,17 @@ from __future__ import annotations
from collections.abc import Sequence
from typing import Any, Literal
from pydantic import ConfigDict, model_validator
from pydantic import ConfigDict, SerializationInfo, field_serializer, model_validator
from typing_extensions import Self
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.base_events import BaseEvent
from crewai.events.base_events import (
BaseEvent,
_is_trace_context,
_trace_agent_ref,
_trace_task_ref,
_trace_tool_names,
)
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
@@ -31,6 +37,21 @@ class AgentExecutionStartedEvent(BaseEvent):
_set_agent_fingerprint(self, self.agent)
return self
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
@field_serializer("tools")
@classmethod
def _serialize_tools(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_tool_names(v) if _is_trace_context(info) else v
class AgentExecutionCompletedEvent(BaseEvent):
"""Event emitted when an agent completes executing a task"""
@@ -48,6 +69,16 @@ class AgentExecutionCompletedEvent(BaseEvent):
_set_agent_fingerprint(self, self.agent)
return self
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class AgentExecutionErrorEvent(BaseEvent):
"""Event emitted when an agent encounters an error during execution"""
@@ -65,6 +96,16 @@ class AgentExecutionErrorEvent(BaseEvent):
_set_agent_fingerprint(self, self.agent)
return self
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
# New event classes for LiteAgent
class LiteAgentExecutionStartedEvent(BaseEvent):
@@ -77,6 +118,11 @@ class LiteAgentExecutionStartedEvent(BaseEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_serializer("tools")
@classmethod
def _serialize_tools(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_tool_names(v) if _is_trace_context(info) else v
class LiteAgentExecutionCompletedEvent(BaseEvent):
"""Event emitted when a LiteAgent completes execution"""

View File

@@ -1,6 +1,8 @@
from typing import TYPE_CHECKING, Any, Literal
from crewai.events.base_events import BaseEvent
from pydantic import SerializationInfo, field_serializer
from crewai.events.base_events import BaseEvent, _is_trace_context
if TYPE_CHECKING:
@@ -26,6 +28,14 @@ class CrewBaseEvent(BaseEvent):
if self.crew.fingerprint.metadata:
self.fingerprint_metadata = self.crew.fingerprint.metadata
@field_serializer("crew")
@classmethod
def _serialize_crew(cls, v: Any, info: SerializationInfo) -> Any:
"""Exclude crew in trace context — crew_kickoff_started builds structure separately."""
if _is_trace_context(info):
return None
return v
def to_json(self, exclude: set[str] | None = None) -> Any:
if exclude is None:
exclude = set()

View File

@@ -1,9 +1,9 @@
from enum import Enum
from typing import Any, Literal
from pydantic import BaseModel
from pydantic import BaseModel, SerializationInfo, field_serializer
from crewai.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent, _is_trace_context
class LLMEventBase(BaseEvent):
@@ -49,6 +49,16 @@ class LLMCallStartedEvent(LLMEventBase):
callbacks: list[Any] | None = None
available_functions: dict[str, Any] | None = None
@field_serializer("callbacks")
@classmethod
def _serialize_callbacks(cls, v: Any, info: SerializationInfo) -> Any:
return None if _is_trace_context(info) else v
@field_serializer("available_functions")
@classmethod
def _serialize_available_functions(cls, v: Any, info: SerializationInfo) -> Any:
return None if _is_trace_context(info) else v
class LLMCallCompletedEvent(LLMEventBase):
"""Event emitted when a LLM call completes"""

View File

@@ -1,6 +1,8 @@
from typing import Any, Literal
from crewai.events.base_events import BaseEvent
from pydantic import SerializationInfo, field_serializer
from crewai.events.base_events import BaseEvent, _is_trace_context, _trace_task_ref
from crewai.tasks.task_output import TaskOutput
@@ -32,6 +34,11 @@ class TaskStartedEvent(BaseEvent):
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class TaskCompletedEvent(BaseEvent):
"""Event emitted when a task completes"""
@@ -44,6 +51,11 @@ class TaskCompletedEvent(BaseEvent):
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class TaskFailedEvent(BaseEvent):
"""Event emitted when a task fails"""
@@ -56,6 +68,11 @@ class TaskFailedEvent(BaseEvent):
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v
class TaskEvaluationEvent(BaseEvent):
"""Event emitted when a task evaluation is completed"""
@@ -67,3 +84,8 @@ class TaskEvaluationEvent(BaseEvent):
def __init__(self, **data: Any) -> None:
super().__init__(**data)
_set_task_fingerprint(self, self.task)
@field_serializer("task")
@classmethod
def _serialize_task(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_task_ref(v) if _is_trace_context(info) else v

View File

@@ -2,9 +2,9 @@ from collections.abc import Callable
from datetime import datetime
from typing import Any, Literal
from pydantic import ConfigDict
from pydantic import ConfigDict, SerializationInfo, field_serializer
from crewai.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent, _is_trace_context, _trace_agent_ref
class ToolUsageEvent(BaseEvent):
@@ -26,6 +26,11 @@ class ToolUsageEvent(BaseEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
def __init__(self, **data: Any) -> None:
if data.get("from_task"):
task = data["from_task"]
@@ -99,6 +104,11 @@ class ToolExecutionErrorEvent(BaseEvent):
tool_class: Callable[..., Any]
agent: Any | None = None
@field_serializer("agent")
@classmethod
def _serialize_agent(cls, v: Any, info: SerializationInfo) -> Any:
return _trace_agent_ref(v) if _is_trace_context(info) else v
def __init__(self, **data: Any) -> None:
super().__init__(**data)
# Set fingerprint data from the agent

View File

@@ -19,7 +19,6 @@ from collections.abc import Callable
from copy import deepcopy
import datetime
import logging
import threading
from typing import TYPE_CHECKING, Annotated, Any, Final, Literal, TypedDict, Union, cast
import uuid
@@ -92,9 +91,6 @@ def resolve_refs(schema: dict[str, Any]) -> dict[str, Any]:
This is needed because Pydantic generates $ref-based schemas that
some consumers (e.g. LLMs, tool frameworks) don't handle well.
Circular references are detected and replaced with a plain
``{"type": "object"}`` stub to prevent infinite recursion.
Args:
schema: JSON Schema dict that may contain "$refs" and "$defs".
@@ -104,23 +100,18 @@ def resolve_refs(schema: dict[str, Any]) -> dict[str, Any]:
defs = schema.get("$defs", {})
schema_copy = deepcopy(schema)
def _resolve(node: Any, resolving: frozenset[str] = frozenset()) -> Any:
def _resolve(node: Any) -> Any:
if isinstance(node, dict):
ref = node.get("$ref")
if isinstance(ref, str) and ref.startswith("#/$defs/"):
def_name = ref.replace("#/$defs/", "")
if def_name in resolving:
return {"type": "object"}
if def_name in defs:
return _resolve(
deepcopy(defs[def_name]),
resolving | {def_name},
)
return _resolve(deepcopy(defs[def_name]))
raise KeyError(f"Definition '{def_name}' not found in $defs.")
return {k: _resolve(v, resolving) for k, v in node.items()}
return {k: _resolve(v) for k, v in node.items()}
if isinstance(node, list):
return [_resolve(i, resolving) for i in node]
return [_resolve(i) for i in node]
return node
@@ -667,104 +658,6 @@ def build_rich_field_description(prop_schema: dict[str, Any]) -> str:
return ". ".join(parts) if parts else ""
# Thread-local storage tracking which ``$ref`` paths are currently being
# resolved. Used by ``_json_schema_to_pydantic_type`` to detect circular
# ``$ref`` chains and break the recursion with a ``dict`` fallback.
# Each thread gets its own independent set so concurrent schema conversions
# (e.g. via ThreadPoolExecutor in MCP tool resolution) don't interfere.
_resolving_refs_local = threading.local()
def _get_resolving_refs() -> set[str]:
"""Return the per-thread resolving-refs set, creating it on first access."""
refs: set[str] | None = getattr(_resolving_refs_local, "refs", None)
if refs is None:
refs = set()
object.__setattr__(_resolving_refs_local, "refs", refs)
return refs
def _safe_replace_refs(json_schema: dict[str, Any]) -> dict[str, Any]:
"""Resolve ``$ref`` pointers in *json_schema*, tolerating circular refs.
``jsonref.replace_refs(proxies=False)`` performs eager, recursive
inlining. When a definition refers back to itself (directly or
transitively) this blows the Python call stack and also produces
Python dicts with circular object references that break all
downstream recursive visitors.
Strategy: always break circular ``$ref`` chains *before* handing the
schema to ``jsonref`` so the library never encounters a cycle.
"""
schema_copy = deepcopy(json_schema)
defs = schema_copy.get("$defs", {})
if defs and _has_circular_refs(schema_copy, defs):
_break_circular_refs(schema_copy, defs, set())
try:
return dict(jsonref.replace_refs(schema_copy, proxies=False))
except RecursionError:
# Last resort - return the manually patched copy as-is.
return schema_copy
def _has_circular_refs(
node: Any,
defs: dict[str, Any],
visiting: set[str] | None = None,
) -> bool:
"""Return ``True`` if *node* contains any circular ``$ref`` chain."""
if visiting is None:
visiting = set()
if isinstance(node, dict):
ref = node.get("$ref")
if isinstance(ref, str) and ref.startswith("#/$defs/"):
def_name = ref.removeprefix("#/$defs/")
if def_name in visiting:
return True
if def_name in defs:
visiting.add(def_name)
if _has_circular_refs(defs[def_name], defs, visiting):
return True
visiting.discard(def_name)
for value in node.values():
if _has_circular_refs(value, defs, visiting):
return True
elif isinstance(node, list):
for item in node:
if _has_circular_refs(item, defs, visiting):
return True
return False
def _break_circular_refs(
node: Any,
defs: dict[str, Any],
visiting: set[str],
) -> None:
"""Walk *node* in-place and replace circular ``$ref`` pointers with stubs."""
if isinstance(node, dict):
ref = node.get("$ref")
if isinstance(ref, str) and ref.startswith("#/$defs/"):
def_name = ref.removeprefix("#/$defs/")
if def_name in visiting:
# Circular - replace the *whole* node content with a stub.
node.clear()
node["type"] = "object"
return
if def_name in defs:
visiting.add(def_name)
_break_circular_refs(defs[def_name], defs, visiting)
visiting.discard(def_name)
for value in node.values():
_break_circular_refs(value, defs, visiting)
elif isinstance(node, list):
for item in node:
_break_circular_refs(item, defs, visiting)
def create_model_from_schema( # type: ignore[no-any-unimported]
json_schema: dict[str, Any],
*,
@@ -784,10 +677,6 @@ def create_model_from_schema( # type: ignore[no-any-unimported]
as nested objects, referenced definitions ($ref), arrays with typed items,
union types (anyOf/oneOf), and string formats.
Circular ``$ref`` chains (common in complex MCP tool schemas) are detected
and broken automatically so that deeply-nested or self-referential schemas
never trigger a ``RecursionError``.
Args:
json_schema: A dictionary representing the JSON schema.
root_schema: The root schema containing $defs. If not provided, the
@@ -823,7 +712,7 @@ def create_model_from_schema( # type: ignore[no-any-unimported]
>>> person.name
'John'
"""
json_schema = _safe_replace_refs(json_schema)
json_schema = dict(jsonref.replace_refs(json_schema, proxies=False))
effective_root = root_schema or json_schema
@@ -1031,22 +920,13 @@ def _json_schema_to_pydantic_type(
"""
ref = json_schema.get("$ref")
if ref:
# Detect circular $ref chains - if we are already resolving this
# ref higher up the call stack, break the cycle by returning dict.
resolving = _get_resolving_refs()
if ref in resolving:
return dict
resolving.add(ref)
try:
ref_schema = _resolve_ref(ref, root_schema)
return _json_schema_to_pydantic_type(
ref_schema,
root_schema,
name_=name_,
enrich_descriptions=enrich_descriptions,
)
finally:
resolving.discard(ref)
ref_schema = _resolve_ref(ref, root_schema)
return _json_schema_to_pydantic_type(
ref_schema,
root_schema,
name_=name_,
enrich_descriptions=enrich_descriptions,
)
enum_values = json_schema.get("enum")
if enum_values:

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from datetime import date, datetime
from enum import Enum
import json
from typing import Any, TypeAlias
import uuid
@@ -20,6 +21,7 @@ def to_serializable(
max_depth: int = 5,
_current_depth: int = 0,
_ancestors: set[int] | None = None,
context: dict[str, Any] | None = None,
) -> Serializable:
"""Converts a Python object into a JSON-compatible representation.
@@ -33,6 +35,9 @@ def to_serializable(
max_depth: Maximum recursion depth. Defaults to 5.
_current_depth: Current recursion depth (for internal use).
_ancestors: Set of ancestor object ids for cycle detection (for internal use).
context: Optional context dict passed to Pydantic's model_dump(context=...).
Field serializers on the model can inspect this to customize output
(e.g. context={"trace": True} for lightweight trace serialization).
Returns:
Serializable: A JSON-compatible structure.
@@ -48,6 +53,15 @@ def to_serializable(
if isinstance(obj, (str, int, float, bool, type(None))):
return obj
if isinstance(obj, Enum):
return to_serializable(
obj.value,
exclude=exclude,
max_depth=max_depth,
_current_depth=_current_depth,
_ancestors=_ancestors,
context=context,
)
if isinstance(obj, uuid.UUID):
return str(obj)
if isinstance(obj, (date, datetime)):
@@ -66,6 +80,7 @@ def to_serializable(
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
for item in obj
]
@@ -77,17 +92,24 @@ def to_serializable(
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
for key, value in obj.items()
if key not in exclude
}
if isinstance(obj, BaseModel):
try:
dump_kwargs: dict[str, Any] = {}
if exclude:
dump_kwargs["exclude"] = exclude
if context is not None:
dump_kwargs["context"] = context
return to_serializable(
obj=obj.model_dump(exclude=exclude),
obj=obj.model_dump(**dump_kwargs),
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
except Exception:
try:
@@ -97,12 +119,30 @@ def to_serializable(
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
for k, v in obj.__dict__.items()
if k not in (exclude or set())
}
except Exception:
return repr(obj)
if callable(obj):
return repr(obj)
if hasattr(obj, "__dict__"):
try:
return {
_to_serializable_key(k): to_serializable(
v,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
context=context,
)
for k, v in obj.__dict__.items()
if not k.startswith("_")
}
except Exception:
return repr(obj)
return repr(obj)

View File

@@ -0,0 +1,612 @@
"""Tests for trace serialization optimization using Pydantic v2 context-based serialization.
These tests verify that trace events use @field_serializer with SerializationInfo.context
to produce lightweight representations, reducing event sizes from 50-100KB to a few KB.
"""
import json
import uuid
from typing import Any
from unittest.mock import MagicMock
import pytest
from pydantic import ConfigDict
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.base_events import _trace_agent_ref, _trace_task_ref, _trace_tool_names
from crewai.events.listeners.tracing.utils import safe_serialize_to_dict
from crewai.utilities.serialization import to_serializable
# ---------------------------------------------------------------------------
# Lightweight BaseAgent subclass for tests (avoids heavy dependencies)
# ---------------------------------------------------------------------------
class _StubAgent(BaseAgent):
"""Minimal BaseAgent subclass that satisfies validation without heavy deps."""
model_config = ConfigDict(arbitrary_types_allowed=True)
def execute_task(self, *a: Any, **kw: Any) -> str:
return ""
def create_agent_executor(self, *a: Any, **kw: Any) -> None:
pass
def _parse_tools(self, *a: Any, **kw: Any) -> list:
return []
def get_delegation_tools(self, *a: Any, **kw: Any) -> list:
return []
def get_output_converter(self, *a: Any, **kw: Any) -> Any:
return None
def get_multimodal_tools(self, *a: Any, **kw: Any) -> list:
return []
async def aexecute_task(self, *a: Any, **kw: Any) -> str:
return ""
def get_mcp_tools(self, *a: Any, **kw: Any) -> list:
return []
def get_platform_tools(self, *a: Any, **kw: Any) -> list:
return []
def _make_stub_agent(**overrides) -> _StubAgent:
"""Create a minimal BaseAgent instance for testing."""
defaults = {
"role": "Researcher",
"goal": "Research things",
"backstory": "Expert researcher",
"tools": [],
}
defaults.update(overrides)
return _StubAgent(**defaults)
# ---------------------------------------------------------------------------
# Helpers to build realistic mock objects for event fields
# ---------------------------------------------------------------------------
def _make_mock_task(**overrides):
task = MagicMock()
task.id = overrides.get("id", uuid.uuid4())
task.name = overrides.get("name", "Research Task")
task.description = overrides.get("description", "Do research")
task.expected_output = overrides.get("expected_output", "Research results")
task.async_execution = overrides.get("async_execution", False)
task.human_input = overrides.get("human_input", False)
task.agent = overrides.get("agent", _make_stub_agent())
task.context = overrides.get("context", None)
task.crew = MagicMock()
task.tools = overrides.get("tools", [MagicMock(), MagicMock()])
fp = MagicMock()
fp.uuid_str = str(uuid.uuid4())
fp.metadata = {"name": task.name}
task.fingerprint = fp
return task
def _make_stub_tool(tool_name="web_search") -> Any:
"""Create a minimal BaseTool instance for testing."""
from crewai.tools.base_tool import BaseTool
class _StubTool(BaseTool):
name: str = "stub"
description: str = "stub tool"
def _run(self, *a: Any, **kw: Any) -> str:
return ""
return _StubTool(name=tool_name, description=f"{tool_name} tool")
# ---------------------------------------------------------------------------
# Unit tests: trace ref helpers
# ---------------------------------------------------------------------------
class TestTraceRefHelpers:
def test_trace_agent_ref(self):
agent = _make_stub_agent(role="Analyst")
ref = _trace_agent_ref(agent)
assert ref["role"] == "Analyst"
assert "id" in ref
assert len(ref) == 2 # only id and role
def test_trace_agent_ref_none(self):
assert _trace_agent_ref(None) is None
def test_trace_task_ref(self):
task = _make_mock_task(name="Write Report")
ref = _trace_task_ref(task)
assert ref["name"] == "Write Report"
assert "id" in ref
assert len(ref) == 2
def test_trace_task_ref_falls_back_to_description(self):
task = _make_mock_task(name=None, description="Describe the report")
ref = _trace_task_ref(task)
assert ref["name"] == "Describe the report"
def test_trace_task_ref_none(self):
assert _trace_task_ref(None) is None
def test_trace_tool_names(self):
tools = [_make_stub_tool("search"), _make_stub_tool("read")]
names = _trace_tool_names(tools)
assert names == ["search", "read"]
def test_trace_tool_names_empty(self):
assert _trace_tool_names([]) is None
assert _trace_tool_names(None) is None
# ---------------------------------------------------------------------------
# Integration tests: field serializers on real event classes
# ---------------------------------------------------------------------------
class TestAgentEventFieldSerializers:
"""Test that agent event field serializers respond to trace context."""
def test_agent_execution_started_trace_context(self):
from crewai.events.types.agent_events import AgentExecutionStartedEvent
agent = _make_stub_agent(role="Researcher")
task = _make_mock_task(name="Research Task")
tools = [_make_stub_tool("search"), _make_stub_tool("read")]
event = AgentExecutionStartedEvent(
agent=agent, task=task, tools=tools, task_prompt="Do research"
)
# With trace context: lightweight refs
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["agent"] == {"id": str(agent.id), "role": "Researcher"}
assert trace_dump["task"] == {"id": str(task.id), "name": "Research Task"}
assert trace_dump["tools"] == ["search", "read"]
def test_agent_execution_started_no_context(self):
from crewai.events.types.agent_events import AgentExecutionStartedEvent
agent = _make_stub_agent(role="SpecificRole")
task = _make_mock_task()
event = AgentExecutionStartedEvent(
agent=agent, task=task, tools=None, task_prompt="Do research"
)
# Without context: full agent dict (Pydantic model_dump expands it)
normal_dump = event.model_dump()
assert isinstance(normal_dump["agent"], dict)
assert normal_dump["agent"]["role"] == "SpecificRole"
# Should have ALL agent fields, not just the lightweight ref
assert "goal" in normal_dump["agent"]
assert "backstory" in normal_dump["agent"]
assert "max_iter" in normal_dump["agent"]
def test_agent_execution_error_preserves_identification(self):
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Analyst")
task = _make_mock_task(name="Analysis Task")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="Something went wrong"
)
trace_dump = event.model_dump(context={"trace": True})
# Error events should still have agent/task identification as refs
assert trace_dump["agent"]["role"] == "Analyst"
assert trace_dump["task"]["name"] == "Analysis Task"
assert trace_dump["error"] == "Something went wrong"
def test_agent_execution_completed_trace_context(self):
from crewai.events.types.agent_events import AgentExecutionCompletedEvent
agent = _make_stub_agent(role="Writer")
task = _make_mock_task(name="Writing Task")
event = AgentExecutionCompletedEvent(
agent=agent, task=task, output="Final output"
)
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["agent"]["role"] == "Writer"
assert trace_dump["task"]["name"] == "Writing Task"
assert trace_dump["output"] == "Final output"
class TestTaskEventFieldSerializers:
"""Test that task event field serializers respond to trace context."""
def test_task_started_trace_context(self):
from crewai.events.types.task_events import TaskStartedEvent
task = _make_mock_task(name="Test Task")
event = TaskStartedEvent(task=task, context="some context")
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["task"] == {"id": str(task.id), "name": "Test Task"}
assert trace_dump["context"] == "some context"
def test_task_failed_trace_context(self):
from crewai.events.types.task_events import TaskFailedEvent
task = _make_mock_task(name="Failing Task")
event = TaskFailedEvent(task=task, error="Task failed")
trace_dump = event.model_dump(context={"trace": True})
assert trace_dump["task"]["name"] == "Failing Task"
assert trace_dump["error"] == "Task failed"
class TestCrewEventFieldSerializers:
"""Test that crew event field serializers respond to trace context."""
def test_crew_kickoff_started_excludes_crew_in_trace(self):
from crewai.events.types.crew_events import CrewKickoffStartedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs={"key": "value"}
)
trace_dump = event.model_dump(context={"trace": True})
# crew field should be None in trace context
assert trace_dump["crew"] is None
# scalar fields preserved
assert trace_dump["crew_name"] == "TestCrew"
assert trace_dump["inputs"] == {"key": "value"}
def test_crew_event_no_context_preserves_crew(self):
from crewai.events.types.crew_events import CrewKickoffStartedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs=None
)
normal_dump = event.model_dump()
# Without trace context, crew should NOT be None (field serializer didn't fire)
assert normal_dump["crew"] is not None
class TestLLMEventFieldSerializers:
"""Test that LLM event field serializers respond to trace context."""
def test_llm_call_started_excludes_callbacks_in_trace(self):
from crewai.events.types.llm_events import LLMCallStartedEvent
event = LLMCallStartedEvent(
call_id="test-call",
messages=[{"role": "user", "content": "Hello"}],
tools=[{"name": "search", "description": "Search tool"}],
callbacks=[MagicMock(), MagicMock()],
available_functions={"search": MagicMock()},
)
trace_dump = event.model_dump(context={"trace": True})
# callbacks and available_functions excluded
assert trace_dump["callbacks"] is None
assert trace_dump["available_functions"] is None
# tools preserved (lightweight list of dicts)
assert trace_dump["tools"] == [{"name": "search", "description": "Search tool"}]
# messages preserved
assert trace_dump["messages"] == [{"role": "user", "content": "Hello"}]
# ---------------------------------------------------------------------------
# Integration tests: safe_serialize_to_dict with context
# ---------------------------------------------------------------------------
class TestSafeSerializeWithContext:
"""Test that safe_serialize_to_dict properly passes context through."""
def test_context_flows_through_to_field_serializers(self):
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Worker")
task = _make_mock_task(name="Work Task")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="error msg"
)
result = safe_serialize_to_dict(event, context={"trace": True})
# Field serializers should have fired
assert result["agent"] == {"id": str(agent.id), "role": "Worker"}
assert result["task"] == {"id": str(task.id), "name": "Work Task"}
assert result["error"] == "error msg"
def test_no_context_preserves_full_serialization(self):
from crewai.events.types.task_events import TaskFailedEvent
task = _make_mock_task(name="Test")
event = TaskFailedEvent(task=task, error="fail")
result = safe_serialize_to_dict(event)
# Without context, task should not be a lightweight ref
assert result.get("task") is not None
# It should be the raw object (model_dump returns it as-is for Any fields)
# to_serializable will then repr() or process it further
# ---------------------------------------------------------------------------
# Integration tests: TraceCollectionListener._build_event_data
# ---------------------------------------------------------------------------
class TestBuildEventData:
@pytest.fixture
def listener(self):
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_crew_kickoff_started_has_crew_structure(self, listener):
agent = _make_stub_agent(role="Researcher")
agent.tools = [_make_stub_tool("search"), _make_stub_tool("read")]
task = _make_mock_task(name="Research Task", agent=agent)
task.context = None
crew = MagicMock()
crew.agents = [agent]
crew.tasks = [task]
crew.process = "sequential"
crew.verbose = True
crew.memory = False
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
from crewai.events.types.crew_events import CrewKickoffStartedEvent
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs={"key": "value"}
)
result = listener._build_event_data("crew_kickoff_started", event, None)
assert "crew_structure" in result
cs = result["crew_structure"]
assert len(cs["agents"]) == 1
assert cs["agents"][0]["role"] == "Researcher"
assert cs["agents"][0]["tool_names"] == ["search", "read"]
assert len(cs["tasks"]) == 1
assert cs["tasks"][0]["name"] == "Research Task"
assert "agent_ref" in cs["tasks"][0]
assert cs["tasks"][0]["agent_ref"]["role"] == "Researcher"
def test_crew_kickoff_started_context_task_ids(self, listener):
agent = _make_stub_agent()
task1 = _make_mock_task(name="Task 1", agent=agent)
task1.context = None
task2 = _make_mock_task(name="Task 2", agent=agent)
task2.context = [task1]
crew = MagicMock()
crew.agents = [agent]
crew.tasks = [task1, task2]
crew.process = "sequential"
crew.verbose = False
crew.memory = False
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
from crewai.events.types.crew_events import CrewKickoffStartedEvent
event = CrewKickoffStartedEvent(
crew=crew, crew_name="TestCrew", inputs=None
)
result = listener._build_event_data("crew_kickoff_started", event, None)
task2_data = result["crew_structure"]["tasks"][1]
assert "context_task_ids" in task2_data
assert str(task1.id) in task2_data["context_task_ids"]
def test_generic_event_uses_trace_context(self, listener):
"""Non-complex events should use context-based serialization."""
from crewai.events.types.crew_events import CrewKickoffCompletedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffCompletedEvent(
crew=crew, crew_name="TestCrew", output="Final result", total_tokens=5000
)
result = listener._build_event_data("crew_kickoff_completed", event, None)
# Scalar fields preserved
assert result.get("crew_name") == "TestCrew"
assert result.get("total_tokens") == 5000
# crew excluded by field serializer
assert result.get("crew") is None
# No crew_structure (that's only for kickoff_started)
assert "crew_structure" not in result
def test_task_started_custom_projection(self, listener):
task = _make_mock_task(name="Test Task")
from crewai.events.types.task_events import TaskStartedEvent
event = TaskStartedEvent(task=task, context="test context")
source = MagicMock()
source.agent = _make_stub_agent(role="Worker")
result = listener._build_event_data("task_started", event, source)
assert result["task_name"] == "Test Task"
assert result["agent_role"] == "Worker"
assert result["task_id"] == str(task.id)
assert result["context"] == "test context"
def test_llm_call_started_uses_trace_context(self, listener):
from crewai.events.types.llm_events import LLMCallStartedEvent
event = LLMCallStartedEvent(
call_id="test",
messages=[{"role": "user", "content": "Hello"}],
tools=[{"name": "search"}],
callbacks=[MagicMock()],
available_functions={"fn": MagicMock()},
)
result = listener._build_event_data("llm_call_started", event, None)
# callbacks and available_functions excluded via field serializer
assert result.get("callbacks") is None
assert result.get("available_functions") is None
# tools preserved (lightweight schemas)
assert result.get("tools") == [{"name": "search"}]
def test_agent_execution_error_preserves_identification(self, listener):
"""Error events should preserve agent/task identification via field serializers."""
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Analyst")
task = _make_mock_task(name="Analysis")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="Something broke"
)
result = listener._build_event_data("agent_execution_error", event, None)
# Field serializers return lightweight refs, not None
assert result["agent"] == {"id": str(agent.id), "role": "Analyst"}
assert result["task"] == {"id": str(task.id), "name": "Analysis"}
assert result["error"] == "Something broke"
def test_task_failed_preserves_identification(self, listener):
from crewai.events.types.task_events import TaskFailedEvent
task = _make_mock_task(name="Failed Task")
event = TaskFailedEvent(task=task, error="Task failed")
result = listener._build_event_data("task_failed", event, None)
assert result["task"] == {"id": str(task.id), "name": "Failed Task"}
assert result["error"] == "Task failed"
# ---------------------------------------------------------------------------
# Size reduction verification
# ---------------------------------------------------------------------------
class TestSizeReduction:
@pytest.fixture
def listener(self):
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_task_started_event_size(self, listener):
"""task_started event data should be well under 2KB."""
agent = _make_stub_agent(
role="Researcher",
goal="Research" * 50,
backstory="Expert" * 100,
)
agent.tools = [_make_stub_tool(f"tool_{i}") for i in range(5)]
task = _make_mock_task(
name="Research Task",
description="Detailed description" * 20,
expected_output="Expected" * 10,
agent=agent,
)
task.context = [_make_mock_task() for _ in range(3)]
task.tools = [_make_stub_tool(f"t_{i}") for i in range(3)]
from crewai.events.types.task_events import TaskStartedEvent
event = TaskStartedEvent(task=task, context="test context")
source = MagicMock()
source.agent = agent
result = listener._build_event_data("task_started", event, source)
serialized = json.dumps(result, default=str)
assert len(serialized) < 2000, f"task_started too large: {len(serialized)} bytes"
assert "task_name" in result
assert "agent_role" in result
def test_error_event_size(self, listener):
"""Error events should be small despite having agent/task refs."""
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(
goal="Very long goal " * 100,
backstory="Very long backstory " * 100,
)
task = _make_mock_task(description="Very long description " * 100)
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="error"
)
result = listener._build_event_data("agent_execution_error", event, None)
serialized = json.dumps(result, default=str)
# Should be small - agent/task are just {id, role/name} refs
assert len(serialized) < 5000, f"error event too large: {len(serialized)} bytes"
# ---------------------------------------------------------------------------
# to_serializable context threading
# ---------------------------------------------------------------------------
class TestToSerializableContext:
"""Test that context parameter flows through to_serializable correctly."""
def test_context_passed_to_model_dump(self):
from crewai.events.types.agent_events import AgentExecutionErrorEvent
agent = _make_stub_agent(role="Tester")
task = _make_mock_task(name="Test Task")
event = AgentExecutionErrorEvent(
agent=agent, task=task, error="test error"
)
# Directly use to_serializable with context
result = to_serializable(event, context={"trace": True})
assert isinstance(result, dict)
assert result["agent"] == {"id": str(agent.id), "role": "Tester"}
assert result["task"] == {"id": str(task.id), "name": "Test Task"}
def test_no_context_does_not_trigger_serializers(self):
from crewai.events.types.crew_events import CrewKickoffStartedEvent
crew = MagicMock()
crew.fingerprint = MagicMock()
crew.fingerprint.uuid_str = str(uuid.uuid4())
crew.fingerprint.metadata = {}
event = CrewKickoffStartedEvent(
crew=crew, crew_name="Test", inputs=None
)
# Without context, crew should NOT be None
result = event.model_dump()
assert result["crew"] is not None

View File

@@ -19,9 +19,6 @@ import pytest
from pydantic import BaseModel
from crewai.utilities.pydantic_schema_utils import (
_break_circular_refs,
_has_circular_refs,
_safe_replace_refs,
build_rich_field_description,
convert_oneof_to_anyof,
create_model_from_schema,
@@ -885,333 +882,3 @@ class TestEndToEndMCPSchema:
)
assert obj.filters.date_from == datetime.date(2025, 1, 1)
assert obj.filters.categories == ["news", "tech"]
# ---------------------------------------------------------------------------
# Circular $ref handling (issue #5474)
# ---------------------------------------------------------------------------
class TestCircularRefDetection:
"""Tests for _has_circular_refs helper."""
def test_detects_direct_self_reference(self) -> None:
schema: dict[str, Any] = {
"type": "object",
"properties": {"child": {"$ref": "#/$defs/Node"}},
"$defs": {
"Node": {
"type": "object",
"properties": {
"children": {
"type": "array",
"items": {"$ref": "#/$defs/Node"},
},
},
},
},
}
assert _has_circular_refs(schema, schema["$defs"]) is True
def test_detects_indirect_circular_reference(self) -> None:
schema: dict[str, Any] = {
"type": "object",
"properties": {"a": {"$ref": "#/$defs/A"}},
"$defs": {
"A": {
"type": "object",
"properties": {"b": {"$ref": "#/$defs/B"}},
},
"B": {
"type": "object",
"properties": {"a": {"$ref": "#/$defs/A"}},
},
},
}
assert _has_circular_refs(schema, schema["$defs"]) is True
def test_no_circular_ref(self) -> None:
schema: dict[str, Any] = {
"type": "object",
"properties": {"item": {"$ref": "#/$defs/Item"}},
"$defs": {
"Item": {
"type": "object",
"properties": {"name": {"type": "string"}},
},
},
}
assert _has_circular_refs(schema, schema["$defs"]) is False
class TestBreakCircularRefs:
"""Tests for _break_circular_refs helper."""
def test_breaks_direct_self_reference(self) -> None:
schema: dict[str, Any] = {
"type": "object",
"properties": {"child": {"$ref": "#/$defs/Node"}},
"$defs": {
"Node": {
"type": "object",
"properties": {
"name": {"type": "string"},
"children": {
"type": "array",
"items": {"$ref": "#/$defs/Node"},
},
},
},
},
}
_break_circular_refs(schema, schema["$defs"], set())
# The self-referential $ref inside Node's items should be replaced
items = schema["$defs"]["Node"]["properties"]["children"]["items"]
assert items == {"type": "object"}
assert "$ref" not in items
def test_preserves_non_circular_refs(self) -> None:
schema: dict[str, Any] = {
"type": "object",
"properties": {"item": {"$ref": "#/$defs/Item"}},
"$defs": {
"Item": {
"type": "object",
"properties": {"name": {"type": "string"}},
},
},
}
original = deepcopy(schema)
_break_circular_refs(schema, schema["$defs"], set())
# Non-circular schema should be unchanged
assert schema == original
class TestSafeReplaceRefs:
"""Tests for _safe_replace_refs."""
def test_resolves_non_circular_schema(self) -> None:
schema: dict[str, Any] = {
"type": "object",
"properties": {"item": {"$ref": "#/$defs/Item"}},
"$defs": {
"Item": {
"type": "object",
"properties": {"id": {"type": "integer"}},
},
},
}
result = _safe_replace_refs(schema)
assert "$ref" not in result.get("properties", {}).get("item", {})
assert result["properties"]["item"]["type"] == "object"
def test_handles_circular_schema_without_recursion_error(self) -> None:
schema: dict[str, Any] = {
"type": "object",
"properties": {"root": {"$ref": "#/$defs/TreeNode"}},
"$defs": {
"TreeNode": {
"type": "object",
"properties": {
"name": {"type": "string"},
"children": {
"type": "array",
"items": {"$ref": "#/$defs/TreeNode"},
},
},
},
},
}
# Must not raise RecursionError
result = _safe_replace_refs(schema)
assert isinstance(result, dict)
class TestResolveRefsCircular:
"""Tests that resolve_refs handles circular references."""
def test_circular_ref_does_not_recurse(self) -> None:
schema: dict[str, Any] = {
"type": "object",
"properties": {"root": {"$ref": "#/$defs/Node"}},
"$defs": {
"Node": {
"type": "object",
"properties": {
"child": {"$ref": "#/$defs/Node"},
},
},
},
}
resolved = resolve_refs(schema)
# The circular ref should become {"type": "object"} stub
child = resolved["properties"]["root"]["properties"]["child"]
assert child == {"type": "object"}
def test_indirect_circular_ref(self) -> None:
schema: dict[str, Any] = {
"type": "object",
"properties": {"a": {"$ref": "#/$defs/A"}},
"$defs": {
"A": {
"type": "object",
"properties": {"b": {"$ref": "#/$defs/B"}},
},
"B": {
"type": "object",
"properties": {"a": {"$ref": "#/$defs/A"}},
},
},
}
resolved = resolve_refs(schema)
# A -> B -> A(cycle) => the second A should be a stub
b_schema = resolved["properties"]["a"]["properties"]["b"]
assert b_schema["properties"]["a"] == {"type": "object"}
class TestCreateModelCircularRef:
"""End-to-end tests for create_model_from_schema with circular $ref schemas.
Regression tests for GitHub issue #5474: MCP servers with >10 tools
that expose self-referential JSON schemas caused
``RecursionError: maximum recursion depth exceeded``.
"""
def test_direct_self_referential_schema(self) -> None:
"""A type that references itself (tree-like structure)."""
schema: dict[str, Any] = {
"type": "object",
"properties": {
"name": {"type": "string"},
"children": {
"type": "array",
"items": {"$ref": "#/$defs/TreeNode"},
},
},
"required": ["name"],
"$defs": {
"TreeNode": {
"type": "object",
"properties": {
"name": {"type": "string"},
"children": {
"type": "array",
"items": {"$ref": "#/$defs/TreeNode"},
},
},
"required": ["name"],
},
},
}
Model = create_model_from_schema(schema, model_name="TreeSchema")
assert Model.__name__ == "TreeSchema"
obj = Model(name="root")
assert obj.name == "root"
def test_indirect_circular_reference(self) -> None:
"""Two types that reference each other (A -> B -> A)."""
schema: dict[str, Any] = {
"type": "object",
"properties": {"node": {"$ref": "#/$defs/NodeA"}},
"required": ["node"],
"$defs": {
"NodeA": {
"type": "object",
"properties": {
"name": {"type": "string"},
"linked": {"$ref": "#/$defs/NodeB"},
},
"required": ["name"],
},
"NodeB": {
"type": "object",
"properties": {
"value": {"type": "integer"},
"back": {"$ref": "#/$defs/NodeA"},
},
"required": ["value"],
},
},
}
Model = create_model_from_schema(schema, model_name="MutualRef")
obj = Model(node={"name": "hello", "linked": {"value": 42}})
assert obj.node.name == "hello"
def test_many_tools_with_complex_schemas(self) -> None:
"""Simulate an MCP server exposing >10 tools (issue #5474 trigger)."""
for i in range(15):
tool_schema: dict[str, Any] = {
"type": "object",
"properties": {
"query": {"type": "string"},
"options": {
"type": "object",
"properties": {
"limit": {"type": "integer"},
"filter": {"type": "string"},
},
},
},
"required": ["query"],
}
Model = create_model_from_schema(
tool_schema, model_name=f"Tool{i}Schema"
)
obj = Model(query=f"test_{i}")
assert obj.query == f"test_{i}"
def test_circular_ref_with_enrich_descriptions(self) -> None:
"""Circular schema + enrich_descriptions should not blow up."""
schema: dict[str, Any] = {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Node name"},
"child": {"$ref": "#/$defs/Recursive"},
},
"required": ["name"],
"$defs": {
"Recursive": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Name"},
"child": {"$ref": "#/$defs/Recursive"},
},
},
},
}
Model = create_model_from_schema(
schema,
model_name="EnrichedCircular",
enrich_descriptions=True,
)
assert Model.__name__ == "EnrichedCircular"
obj = Model(name="top")
assert obj.name == "top"
def test_deeply_nested_non_circular_still_works(self) -> None:
"""A deep but non-circular chain of $refs should still resolve."""
schema: dict[str, Any] = {
"type": "object",
"properties": {"l1": {"$ref": "#/$defs/Level1"}},
"required": ["l1"],
"$defs": {
"Level1": {
"type": "object",
"properties": {"l2": {"$ref": "#/$defs/Level2"}},
"required": ["l2"],
},
"Level2": {
"type": "object",
"properties": {"l3": {"$ref": "#/$defs/Level3"}},
"required": ["l3"],
},
"Level3": {
"type": "object",
"properties": {"value": {"type": "string"}},
"required": ["value"],
},
},
}
Model = create_model_from_schema(schema, model_name="DeepChain")
obj = Model(l1={"l2": {"l3": {"value": "deep"}}})
assert obj.l1.l2.l3.value == "deep"

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.2a5"
__version__ = "1.14.2a4"