Compare commits

..

5 Commits

Author SHA1 Message Date
Alex
b753012fc8 fix: preserve agent/task identification in error events and fix exclude propagation
- Add custom handlers for agent_execution_error and task_failed events to
  extract agent_role/agent_id and task_name/task_id before generic serialization
  strips them (these fields are in TRACE_EXCLUDE_FIELDS)
- Remove exclude propagation in __dict__ handler to match Pydantic fallback behavior

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-30 14:18:09 -07:00
Alex
b40098b28e fix: address PR review comments on trace serialization
1. Remove unused import Mock from test_trace_serialization.py
2. Remove unused import safe_serialize_to_dict from test_trace_serialization.py
3. Fix LLM event tools data being silently excluded: Remove 'tools' from
   TRACE_EXCLUDE_FIELDS since LLMCallStartedEvent.tools is a lightweight
   list of tool schemas, not heavy Agent.tools objects. Agent.tools is
   already excluded explicitly in _build_crew_started_data.
4. Remove dead code: complex_events class variable from TraceCollectionListener

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-30 11:00:28 -07:00
Alex
a4f1164812 fix: reduce trace event serialization bloat by excluding redundant nested objects
Each trace event was serializing the ENTIRE Crew/Task/Agent object graph into
event_data JSONB, causing 500GB+ trace tables in production. For a crew with
5 agents and 10 tasks, each event could be 50-100KB because:
- Crew serialized full tasks AND full agents (with all tools, LLM configs)
- Each Task re-serialized its agent (same Agent already in Crew.agents)
- Each Task re-serialized context tasks (same Tasks already in Crew.tasks)

This fix:
1. Adds TRACE_EXCLUDE_FIELDS constant listing back-references and heavy fields
   to exclude (crew, agent, agents, tasks, context, tools, llm, callbacks, etc.)

2. Adds _serialize_for_trace() helper that uses safe_serialize_to_dict with
   the exclusion set, keeping scalar fields (agent_role, task_name, etc.)
   that the AMP frontend actually reads

3. Updates _build_event_data() to use lightweight serialization for all
   events except crew_kickoff_started

4. Adds _build_crew_started_data() that serializes the full crew structure
   ONCE with:
   - Agents with tool_names (list of strings, not full tool objects)
   - Tasks with agent_ref (just {id, role}) instead of full agent
   - Tasks with context_task_ids (just IDs) instead of full context tasks

5. Updates to_serializable() in serialization.py to:
   - Handle callable objects (functions/lambdas) by falling through to repr()
   - Handle regular classes with __dict__ (not just Pydantic models)

Expected size reduction: 50-100KB per event down to ~1-2KB per event.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-30 09:25:01 -07:00
Lucas Gomide
ac14b9127e fix: handle GPT-5.x models not supporting the stop API parameter (#5144)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
GPT-5.x models reject the `stop` parameter at the API level with "Unsupported parameter: 'stop' is not supported with this model". This breaks CrewAI executions when routing through LiteLLM (e.g. via
OpenAI-compatible gateways like Asimov), because the LiteLLM fallback path always includes `stop` in the API request params.

The native OpenAI provider was unaffected because it never sends `stop` to the API — it applies stop words client-side via `_apply_stop_words()`. However, when the request goes through LiteLLM (custom endpoints, proxy gateways),
`stop` is sent as an API parameter and GPT-5.x rejects it.

Additionally, the existing retry logic that catches this error only matched the OpenAI API error format ("Unsupported parameter") but missed
LiteLLM's own pre-validation error format ("does not support parameters"), so the self-healing retry never triggered for LiteLLM-routed calls.
2026-03-30 11:36:51 -04:00
Thiago Moretto
98b7626784 feat: extract and publish tool metadata to AMP (#4298)
* Exporting tool's metadata to AMP - initial work

* Fix payload (nest under `tools` key)

* Remove debug message + code simplification

* Priting out detected tools

* Extract module name

* fix: address PR review feedback for tool metadata extraction

- Use sha256 instead of md5 for module name hashing (lint S324)
- Filter required list to match filtered properties in JSON schema

* fix: Use sha256 instead of md5 for module name hashing (lint S324)

- Add missing mocks to metadata extraction failure test

* style: fix ruff formatting

* fix: resolve mypy type errors in utils.py

* fix: address bot review feedback on tool metadata

- Use `is not None` instead of truthiness check so empty tools list
  is sent to the API rather than being silently dropped as None
- Strip __init__ suffix from module path for tools in __init__.py files
- Extend _unwrap_schema to handle function-before, function-wrap, and
  definitions wrapper types

* fix: capture env_vars declared with Field(default_factory=...)

When env_vars uses default_factory, pydantic stores a callable in the
schema instead of a static default value. Fall back to calling the
factory when no static default is present.

---------

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-03-30 09:21:53 -04:00
16 changed files with 1836 additions and 139 deletions

View File

@@ -134,10 +134,6 @@ result = flow.kickoff(
)
```
<Note type="info" title="CrewAI Platform Integration">
When deployed on CrewAI Platform, `ImageFile`, `PDFFile`, and other file-typed fields in your flow state automatically get a file upload UI. Users can drag and drop files directly in the Platform interface. Files are stored securely and passed to agents using provider-specific optimizations (inline base64, file upload APIs, or URL references depending on the provider).
</Note>
### With Standalone Agents
Pass files directly to agent kickoff:

View File

@@ -341,87 +341,6 @@ flow.kickoff()
By providing both unstructured and structured state management options, CrewAI Flows empowers developers to build AI workflows that are both flexible and robust, catering to a wide range of application requirements.
### File Inputs
When using structured state, you can include file-typed fields using classes from `crewai-files`. This enables file uploads as part of your flow's input:
```python
from crewai.flow.flow import Flow, start
from crewai_files import ImageFile, PDFFile
from pydantic import BaseModel
class OnboardingState(BaseModel):
document: PDFFile # File upload
cover_image: ImageFile # Image upload
title: str = "" # Text input
class OnboardingFlow(Flow[OnboardingState]):
@start()
def process_upload(self):
# Access files directly from state
print(f"Processing: {self.state.title}")
return self.state.document
```
When deployed on **CrewAI Platform**, file-typed fields automatically render as file upload dropzones in the UI. Users can drag and drop files, which are then passed to your flow.
**Kicking off with files via API:**
The `/kickoff` endpoint auto-detects the request format:
- **JSON body** → normal kickoff
- **multipart/form-data** → file upload + kickoff
API users can also pass URL strings directly to file-typed fields—Pydantic coerces them automatically.
### API Usage
#### Option 1: Multipart kickoff (recommended)
Send files directly with the kickoff request:
```bash
# With files (multipart) — same endpoint
curl -X POST https://your-deployment.crewai.com/kickoff \
-H 'Authorization: Bearer YOUR_TOKEN' \
-F 'inputs={"company_name": "Einstein"}' \
-F 'cnh_image=@/path/to/document.jpg'
```
Files are automatically stored and converted to `FileInput` objects. The agent receives the file with provider-specific optimization (inline base64, file upload API, or URL reference depending on the LLM provider).
#### Option 2: JSON kickoff (no files)
```bash
# Without files (JSON) — same endpoint
curl -X POST https://your-deployment.crewai.com/kickoff \
-H 'Authorization: Bearer YOUR_TOKEN' \
-H 'Content-Type: application/json' \
-d '{"inputs": {"company_name": "Einstein"}}'
```
#### Option 3: Separate upload + kickoff
Upload files first, then reference them:
```bash
# Step 1: Upload
curl -X POST https://your-deployment.crewai.com/files \
-H 'Authorization: Bearer YOUR_TOKEN' \
-F 'file=@/path/to/document.jpg' \
-F 'field_name=cnh_image'
# Returns: {"url": "https://...", "field_name": "cnh_image"}
# Step 2: Kickoff with URL
curl -X POST https://your-deployment.crewai.com/kickoff \
-H 'Authorization: Bearer YOUR_TOKEN' \
-H 'Content-Type: application/json' \
-d '{"inputs": {"company_name": "Einstein"}, "inputFiles": {"cnh_image": "https://..."}}'
```
#### On CrewAI Platform
When using the Platform UI, file-typed fields automatically render as drag-and-drop upload zones. No API calls needed—just drop the file and click Run.
## Flow Persistence
The @persist decorator enables automatic state persistence in CrewAI Flows, allowing you to maintain flow state across restarts or different workflow executions. This decorator can be applied at either the class level or method level, providing flexibility in how you manage state persistence.

View File

@@ -73,6 +73,7 @@ class PlusAPI:
description: str | None,
encoded_file: str,
available_exports: list[dict[str, Any]] | None = None,
tools_metadata: list[dict[str, Any]] | None = None,
) -> httpx.Response:
params = {
"handle": handle,
@@ -81,6 +82,9 @@ class PlusAPI:
"file": encoded_file,
"description": description,
"available_exports": available_exports,
"tools_metadata": {"package": handle, "tools": tools_metadata}
if tools_metadata is not None
else None,
}
return self._make_request("POST", f"{self.TOOLS_RESOURCE}", json=params)

View File

@@ -17,6 +17,7 @@ from crewai.cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
from crewai.cli.utils import (
build_env_with_tool_repository_credentials,
extract_available_exports,
extract_tools_metadata,
get_project_description,
get_project_name,
get_project_version,
@@ -101,6 +102,18 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
console.print(
f"[green]Found these tools to publish: {', '.join([e['name'] for e in available_exports])}[/green]"
)
console.print("[bold blue]Extracting tool metadata...[/bold blue]")
try:
tools_metadata = extract_tools_metadata()
except Exception as e:
console.print(
f"[yellow]Warning: Could not extract tool metadata: {e}[/yellow]\n"
f"Publishing will continue without detailed metadata."
)
tools_metadata = []
self._print_tools_preview(tools_metadata)
self._print_current_organization()
with tempfile.TemporaryDirectory() as temp_build_dir:
@@ -118,7 +131,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
"Project build failed. Please ensure that the command `uv build --sdist` completes successfully.",
style="bold red",
)
raise SystemExit
raise SystemExit(1)
tarball_path = os.path.join(temp_build_dir, tarball_filename)
with open(tarball_path, "rb") as file:
@@ -134,6 +147,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
description=project_description,
encoded_file=f"data:application/x-gzip;base64,{encoded_tarball}",
available_exports=available_exports,
tools_metadata=tools_metadata,
)
self._validate_response(publish_response)
@@ -246,6 +260,55 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
)
raise SystemExit
def _print_tools_preview(self, tools_metadata: list[dict[str, Any]]) -> None:
if not tools_metadata:
console.print("[yellow]No tool metadata extracted.[/yellow]")
return
console.print(
f"\n[bold]Tools to be published ({len(tools_metadata)}):[/bold]\n"
)
for tool in tools_metadata:
console.print(f" [bold cyan]{tool.get('name', 'Unknown')}[/bold cyan]")
if tool.get("module"):
console.print(f" Module: {tool.get('module')}")
console.print(f" Name: {tool.get('humanized_name', 'N/A')}")
console.print(
f" Description: {tool.get('description', 'N/A')[:80]}{'...' if len(tool.get('description', '')) > 80 else ''}"
)
init_params = tool.get("init_params_schema", {}).get("properties", {})
if init_params:
required = tool.get("init_params_schema", {}).get("required", [])
console.print(" Init parameters:")
for param_name, param_info in init_params.items():
param_type = param_info.get("type", "any")
is_required = param_name in required
req_marker = "[red]*[/red]" if is_required else ""
default = (
f" = {param_info['default']}" if "default" in param_info else ""
)
console.print(
f" - {param_name}: {param_type}{default} {req_marker}"
)
env_vars = tool.get("env_vars", [])
if env_vars:
console.print(" Environment variables:")
for env_var in env_vars:
req_marker = "[red]*[/red]" if env_var.get("required") else ""
default = (
f" (default: {env_var['default']})"
if env_var.get("default")
else ""
)
console.print(
f" - {env_var['name']}: {env_var.get('description', 'N/A')}{default} {req_marker}"
)
console.print()
def _print_current_organization(self) -> None:
settings = Settings()
if settings.org_uuid:

View File

@@ -1,10 +1,15 @@
from functools import reduce
from collections.abc import Generator, Mapping
from contextlib import contextmanager
from functools import lru_cache, reduce
import hashlib
import importlib.util
import inspect
from inspect import getmro, isclass, isfunction, ismethod
import os
from pathlib import Path
import shutil
import sys
import types
from typing import Any, cast, get_type_hints
import click
@@ -544,43 +549,62 @@ def build_env_with_tool_repository_credentials(
return env
@contextmanager
def _load_module_from_file(
init_file: Path, module_name: str | None = None
) -> Generator[types.ModuleType | None, None, None]:
"""
Context manager for loading a module from file with automatic cleanup.
Yields the loaded module or None if loading fails.
"""
if module_name is None:
module_name = (
f"temp_module_{hashlib.sha256(str(init_file).encode()).hexdigest()[:8]}"
)
spec = importlib.util.spec_from_file_location(module_name, init_file)
if not spec or not spec.loader:
yield None
return
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
try:
spec.loader.exec_module(module)
yield module
finally:
sys.modules.pop(module_name, None)
def _load_tools_from_init(init_file: Path) -> list[dict[str, Any]]:
"""
Load and validate tools from a given __init__.py file.
"""
spec = importlib.util.spec_from_file_location("temp_module", init_file)
if not spec or not spec.loader:
return []
module = importlib.util.module_from_spec(spec)
sys.modules["temp_module"] = module
try:
spec.loader.exec_module(module)
with _load_module_from_file(init_file) as module:
if module is None:
return []
if not hasattr(module, "__all__"):
console.print(
f"Warning: No __all__ defined in {init_file}",
style="bold yellow",
)
raise SystemExit(1)
return [
{
"name": name,
}
for name in module.__all__
if hasattr(module, name) and is_valid_tool(getattr(module, name))
]
if not hasattr(module, "__all__"):
console.print(
f"Warning: No __all__ defined in {init_file}",
style="bold yellow",
)
raise SystemExit(1)
return [
{"name": name}
for name in module.__all__
if hasattr(module, name) and is_valid_tool(getattr(module, name))
]
except SystemExit:
raise
except Exception as e:
console.print(f"[red]Warning: Could not load {init_file}: {e!s}[/red]")
raise SystemExit(1) from e
finally:
sys.modules.pop("temp_module", None)
def _print_no_tools_warning() -> None:
"""
@@ -610,3 +634,242 @@ def _print_no_tools_warning() -> None:
" # ... implementation\n"
" return result\n"
)
def extract_tools_metadata(dir_path: str = "src") -> list[dict[str, Any]]:
"""
Extract rich metadata from tool classes in the project.
Returns a list of tool metadata dictionaries containing:
- name: Class name
- humanized_name: From name field default
- description: From description field default
- run_params_schema: JSON Schema for _run() params (from args_schema)
- init_params_schema: JSON Schema for __init__ params (filtered)
- env_vars: List of environment variable dicts
"""
tools_metadata: list[dict[str, Any]] = []
for init_file in Path(dir_path).glob("**/__init__.py"):
tools = _extract_tool_metadata_from_init(init_file)
tools_metadata.extend(tools)
return tools_metadata
def _extract_tool_metadata_from_init(init_file: Path) -> list[dict[str, Any]]:
"""
Load module from init file and extract metadata from valid tool classes.
"""
from crewai.tools.base_tool import BaseTool
try:
with _load_module_from_file(init_file) as module:
if module is None:
return []
exported_names = getattr(module, "__all__", None)
if not exported_names:
return []
tools_metadata = []
for name in exported_names:
obj = getattr(module, name, None)
if obj is None or not (
inspect.isclass(obj) and issubclass(obj, BaseTool)
):
continue
if tool_info := _extract_single_tool_metadata(obj):
tools_metadata.append(tool_info)
return tools_metadata
except Exception as e:
console.print(
f"[yellow]Warning: Could not extract metadata from {init_file}: {e}[/yellow]"
)
return []
def _extract_single_tool_metadata(tool_class: type) -> dict[str, Any] | None:
"""
Extract metadata from a single tool class.
"""
try:
core_schema = cast(Any, tool_class).__pydantic_core_schema__
if not core_schema:
return None
schema = _unwrap_schema(core_schema)
fields = schema.get("schema", {}).get("fields", {})
try:
file_path = inspect.getfile(tool_class)
relative_path = Path(file_path).relative_to(Path.cwd())
module_path = relative_path.with_suffix("")
if module_path.parts[0] == "src":
module_path = Path(*module_path.parts[1:])
if module_path.name == "__init__":
module_path = module_path.parent
module = ".".join(module_path.parts)
except (TypeError, ValueError):
module = tool_class.__module__
return {
"name": tool_class.__name__,
"module": module,
"humanized_name": _extract_field_default(
fields.get("name"), fallback=tool_class.__name__
),
"description": str(
_extract_field_default(fields.get("description"))
).strip(),
"run_params_schema": _extract_run_params_schema(fields.get("args_schema")),
"init_params_schema": _extract_init_params_schema(tool_class),
"env_vars": _extract_env_vars(fields.get("env_vars")),
}
except Exception:
return None
def _unwrap_schema(schema: Mapping[str, Any] | dict[str, Any]) -> dict[str, Any]:
"""
Unwrap nested schema structures to get to the actual schema definition.
"""
result: dict[str, Any] = dict(schema)
while (
result.get("type")
in {"function-after", "function-before", "function-wrap", "default"}
and "schema" in result
):
result = dict(result["schema"])
if result.get("type") == "definitions" and "schema" in result:
result = dict(result["schema"])
return result
def _extract_field_default(
field: dict[str, Any] | None, fallback: str | list[Any] = ""
) -> str | list[Any] | int:
"""
Extract the default value from a field schema.
"""
if not field:
return fallback
schema = field.get("schema", {})
default = schema.get("default")
return default if isinstance(default, (list, str, int)) else fallback
@lru_cache(maxsize=1)
def _get_schema_generator() -> type:
"""Get a SchemaGenerator that omits non-serializable defaults."""
from pydantic.json_schema import GenerateJsonSchema
from pydantic_core import PydanticOmit
class SchemaGenerator(GenerateJsonSchema):
def handle_invalid_for_json_schema(
self, schema: Any, error_info: Any
) -> dict[str, Any]:
raise PydanticOmit
return SchemaGenerator
def _extract_run_params_schema(
args_schema_field: dict[str, Any] | None,
) -> dict[str, Any]:
"""
Extract JSON Schema for the tool's run parameters from args_schema field.
"""
from pydantic import BaseModel
if not args_schema_field:
return {}
args_schema_class = args_schema_field.get("schema", {}).get("default")
if not (
inspect.isclass(args_schema_class) and issubclass(args_schema_class, BaseModel)
):
return {}
try:
return args_schema_class.model_json_schema(
schema_generator=_get_schema_generator()
)
except Exception:
return {}
_IGNORED_INIT_PARAMS = frozenset(
{
"name",
"description",
"env_vars",
"args_schema",
"description_updated",
"cache_function",
"result_as_answer",
"max_usage_count",
"current_usage_count",
"package_dependencies",
}
)
def _extract_init_params_schema(tool_class: type) -> dict[str, Any]:
"""
Extract JSON Schema for the tool's __init__ parameters, filtering out base fields.
"""
try:
json_schema: dict[str, Any] = cast(Any, tool_class).model_json_schema(
schema_generator=_get_schema_generator(), mode="serialization"
)
filtered_properties = {
key: value
for key, value in json_schema.get("properties", {}).items()
if key not in _IGNORED_INIT_PARAMS
}
json_schema["properties"] = filtered_properties
if "required" in json_schema:
json_schema["required"] = [
key for key in json_schema["required"] if key in filtered_properties
]
return json_schema
except Exception:
return {}
def _extract_env_vars(env_vars_field: dict[str, Any] | None) -> list[dict[str, Any]]:
"""
Extract environment variable definitions from env_vars field.
"""
from crewai.tools.base_tool import EnvVar
if not env_vars_field:
return []
schema = env_vars_field.get("schema", {})
default = schema.get("default")
if default is None:
default_factory = schema.get("default_factory")
if callable(default_factory):
try:
default = default_factory()
except Exception:
default = []
if not isinstance(default, list):
return []
return [
{
"name": env_var.name,
"description": env_var.description,
"required": env_var.required,
"default": env_var.default,
}
for env_var in default
if isinstance(env_var, EnvVar)
]

View File

@@ -126,18 +126,58 @@ from crewai.events.types.tool_usage_events import (
from crewai.events.utils.console_formatter import ConsoleFormatter
# Fields to exclude from trace serialization to reduce redundant data.
# These back-references and heavy objects create massive bloat when serialized
# repeatedly across events (crew->agents->tasks->agent creates circular refs).
TRACE_EXCLUDE_FIELDS = {
# Back-references that create redundant/circular data
"crew",
"agent",
"agents",
"tasks",
"context",
# Heavy fields not needed in individual trace events
# NOTE: "tools" intentionally NOT here - LLMCallStartedEvent.tools is lightweight
# (list of tool schemas). Agent.tools is excluded in _build_crew_started_data.
"llm",
"function_calling_llm",
"step_callback",
"task_callback",
"crew_callback",
"callbacks",
"_memory",
"_cache",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"knowledge_sources",
}
def _serialize_for_trace(
event: Any, extra_exclude: set[str] | None = None
) -> dict[str, Any]:
"""Serialize an event for tracing, excluding redundant back-references.
Keeps all scalar fields (agent_role, task_name, etc.) that the AMP frontend uses.
Replaces heavy nested objects with lightweight ID references to reduce trace bloat.
Args:
event: The event object to serialize.
extra_exclude: Additional fields to exclude beyond TRACE_EXCLUDE_FIELDS.
Returns:
A dictionary with the serialized event data.
"""
exclude = TRACE_EXCLUDE_FIELDS.copy()
if extra_exclude:
exclude.update(extra_exclude)
return safe_serialize_to_dict(event, exclude=exclude)
class TraceCollectionListener(BaseEventListener):
"""Trace collection listener that orchestrates trace collection."""
complex_events: ClassVar[list[str]] = [
"task_started",
"task_completed",
"llm_call_started",
"llm_call_completed",
"agent_execution_started",
"agent_execution_completed",
]
_instance: Self | None = None
_initialized: bool = False
_listeners_setup: bool = False
@@ -810,9 +850,17 @@ class TraceCollectionListener(BaseEventListener):
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return safe_serialize_to_dict(event)
"""Build event data with optimized serialization to reduce trace bloat.
For most events, excludes heavy nested objects (crew, agents, tasks, tools)
that would create massive redundant data. Only crew_kickoff_started gets
the full crew structure as a one-time dump.
"""
# crew_kickoff_started is special: include full crew structure ONCE
if event_type == "crew_kickoff_started":
return self._build_crew_started_data(event)
# Complex events have custom handling that already extracts only needed fields
if event_type == "task_started":
task_name = event.task.name or event.task.description
task_display_name = (
@@ -853,19 +901,101 @@ class TraceCollectionListener(BaseEventListener):
"agent_backstory": event.agent.backstory,
}
if event_type == "llm_call_started":
event_data = safe_serialize_to_dict(event)
event_data = _serialize_for_trace(event)
event_data["task_name"] = event.task_name or getattr(
event, "task_description", None
)
return event_data
if event_type == "llm_call_completed":
return safe_serialize_to_dict(event)
return _serialize_for_trace(event)
return {
"event_type": event_type,
"event": safe_serialize_to_dict(event),
"source": source,
}
# Error events need agent/task identification extracted before generic
# serialization strips them (agent/task are in TRACE_EXCLUDE_FIELDS)
if event_type == "agent_execution_error":
event_data = _serialize_for_trace(event)
if event.agent:
event_data["agent_role"] = getattr(event.agent, "role", None)
event_data["agent_id"] = str(getattr(event.agent, "id", ""))
return event_data
if event_type == "task_failed":
event_data = _serialize_for_trace(event)
if event.task:
event_data["task_name"] = getattr(event.task, "name", None) or getattr(
event.task, "description", None
)
event_data["task_id"] = str(getattr(event.task, "id", ""))
return event_data
# For all other events, use lightweight serialization
return _serialize_for_trace(event)
def _build_crew_started_data(self, event: Any) -> dict[str, Any]:
"""Build comprehensive crew structure for crew_kickoff_started event.
This is the ONE place where we serialize the full crew structure.
Subsequent events use lightweight references to avoid redundancy.
"""
event_data = _serialize_for_trace(event)
# Add full crew structure with optimized agent/task serialization
crew = getattr(event, "crew", None)
if crew is not None:
# Serialize agents with tools (first occurrence only)
agents_data = []
for agent in getattr(crew, "agents", []) or []:
agent_data = {
"id": str(getattr(agent, "id", "")),
"role": getattr(agent, "role", ""),
"goal": getattr(agent, "goal", ""),
"backstory": getattr(agent, "backstory", ""),
"verbose": getattr(agent, "verbose", False),
"allow_delegation": getattr(agent, "allow_delegation", False),
"max_iter": getattr(agent, "max_iter", None),
"max_rpm": getattr(agent, "max_rpm", None),
}
# Include tool names (not full tool objects)
tools = getattr(agent, "tools", None)
if tools:
agent_data["tool_names"] = [
getattr(t, "name", str(t)) for t in tools
]
agents_data.append(agent_data)
# Serialize tasks with lightweight agent references
tasks_data = []
for task in getattr(crew, "tasks", []) or []:
task_data = {
"id": str(getattr(task, "id", "")),
"name": getattr(task, "name", None),
"description": getattr(task, "description", ""),
"expected_output": getattr(task, "expected_output", ""),
"async_execution": getattr(task, "async_execution", False),
"human_input": getattr(task, "human_input", False),
}
# Replace full agent with lightweight reference
task_agent = getattr(task, "agent", None)
if task_agent:
task_data["agent_ref"] = {
"id": str(getattr(task_agent, "id", "")),
"role": getattr(task_agent, "role", ""),
}
# Replace context tasks with lightweight references
context_tasks = getattr(task, "context", None)
if context_tasks:
task_data["context_task_ids"] = [
str(getattr(ct, "id", "")) for ct in context_tasks
]
tasks_data.append(task_data)
event_data["crew_structure"] = {
"agents": agents_data,
"tasks": tasks_data,
"process": str(getattr(crew, "process", "")),
"verbose": getattr(crew, "verbose", False),
"memory": getattr(crew, "memory", False),
}
return event_data
def _show_tracing_disabled_message(self) -> None:
"""Show a message when tracing is disabled."""

View File

@@ -753,7 +753,7 @@ class LLM(BaseLLM):
"temperature": self.temperature,
"top_p": self.top_p,
"n": self.n,
"stop": self.stop or None,
"stop": (self.stop or None) if self.supports_stop_words() else None,
"max_tokens": self.max_tokens or self.max_completion_tokens,
"presence_penalty": self.presence_penalty,
"frequency_penalty": self.frequency_penalty,
@@ -1825,9 +1825,11 @@ class LLM(BaseLLM):
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
error_str = str(e)
unsupported_stop = "'stop'" in error_str and (
"Unsupported parameter" in error_str
or "does not support parameters" in error_str
)
if unsupported_stop:
if (
@@ -1961,9 +1963,11 @@ class LLM(BaseLLM):
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
error_str = str(e)
unsupported_stop = "'stop'" in error_str and (
"Unsupported parameter" in error_str
or "does not support parameters" in error_str
)
if unsupported_stop:
if (
@@ -2263,6 +2267,10 @@ class LLM(BaseLLM):
Note: This method is only used by the litellm fallback path.
Native providers override this method with their own implementation.
"""
model_lower = self.model.lower() if self.model else ""
if "gpt-5" in model_lower:
return False
if not LITELLM_AVAILABLE or get_supported_openai_params is None:
# When litellm is not available, assume stop words are supported
return True

View File

@@ -2245,6 +2245,9 @@ class OpenAICompletion(BaseLLM):
def supports_stop_words(self) -> bool:
"""Check if the model supports stop words."""
model_lower = self.model.lower() if self.model else ""
if "gpt-5" in model_lower:
return False
return not self.is_o1_model
def get_context_window_size(self) -> int:

View File

@@ -103,6 +103,28 @@ def to_serializable(
}
except Exception:
return repr(obj)
# Callables (functions, methods, lambdas) should fall through to repr
if callable(obj):
return repr(obj)
# Handle regular classes with __dict__ (non-Pydantic)
# Note: Don't propagate exclude to recursive calls, matching Pydantic fallback behavior
if hasattr(obj, "__dict__"):
try:
return {
_to_serializable_key(k): to_serializable(
v,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
)
for k, v in obj.__dict__.items()
if k not in exclude and not k.startswith("_")
}
except Exception:
return repr(obj)
return repr(obj)

View File

@@ -0,0 +1,110 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"What is the capital of France?"}],"model":"gpt-5"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '89'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.2
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-DO4LcSpy72yIXCYSIVOQEXWNXydgn\",\n \"object\":
\"chat.completion\",\n \"created\": 1774628956,\n \"model\": \"gpt-5-2025-08-07\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Paris.\",\n \"refusal\": null,\n
\ \"annotations\": []\n },\n \"finish_reason\": \"stop\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 13,\n \"completion_tokens\":
11,\n \"total_tokens\": 24,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": null\n}\n"
headers:
CF-Cache-Status:
- DYNAMIC
CF-Ray:
- 9e2fc5dce85582fb-GIG
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Fri, 27 Mar 2026 16:29:17 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
content-length:
- '772'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '1343'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
set-cookie:
- SET-COOKIE-XXX
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -136,6 +136,7 @@ class TestPlusAPI(unittest.TestCase):
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params
@@ -173,6 +174,7 @@ class TestPlusAPI(unittest.TestCase):
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
self.assert_request_with_org_id(
@@ -201,6 +203,48 @@ class TestPlusAPI(unittest.TestCase):
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params
)
self.assertEqual(response, mock_response)
@patch("crewai.cli.plus_api.PlusAPI._make_request")
def test_publish_tool_with_tools_metadata(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
handle = "test_tool_handle"
public = True
version = "1.0.0"
description = "Test tool description"
encoded_file = "encoded_test_file"
available_exports = [{"name": "MyTool"}]
tools_metadata = [
{
"name": "MyTool",
"humanized_name": "my_tool",
"description": "A test tool",
"run_params_schema": {"type": "object", "properties": {}},
"init_params_schema": {"type": "object", "properties": {}},
"env_vars": [{"name": "API_KEY", "description": "API key", "required": True, "default": None}],
}
]
response = self.api.publish_tool(
handle, public, version, description, encoded_file,
available_exports=available_exports,
tools_metadata=tools_metadata,
)
params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": available_exports,
"tools_metadata": {"package": handle, "tools": tools_metadata},
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params

View File

@@ -363,3 +363,290 @@ def test_get_crews_ignores_template_directories(
utils.get_crews()
assert not template_crew_detected
# Tests for extract_tools_metadata
def test_extract_tools_metadata_empty_project(temp_project_dir):
"""Test that extract_tools_metadata returns empty list for empty project."""
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []
def test_extract_tools_metadata_no_init_file(temp_project_dir):
"""Test that extract_tools_metadata returns empty list when no __init__.py exists."""
(temp_project_dir / "some_file.py").write_text("print('hello')")
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []
def test_extract_tools_metadata_empty_init_file(temp_project_dir):
"""Test that extract_tools_metadata returns empty list for empty __init__.py."""
create_init_file(temp_project_dir, "")
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []
def test_extract_tools_metadata_no_all_variable(temp_project_dir):
"""Test that extract_tools_metadata returns empty list when __all__ is not defined."""
create_init_file(
temp_project_dir,
"from crewai.tools import BaseTool\n\nclass MyTool(BaseTool):\n pass",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []
def test_extract_tools_metadata_valid_base_tool_class(temp_project_dir):
"""Test that extract_tools_metadata extracts metadata from a valid BaseTool class."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class MyTool(BaseTool):
name: str = "my_tool"
description: str = "A test tool"
__all__ = ['MyTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 1
assert metadata[0]["name"] == "MyTool"
assert metadata[0]["humanized_name"] == "my_tool"
assert metadata[0]["description"] == "A test tool"
def test_extract_tools_metadata_with_args_schema(temp_project_dir):
"""Test that extract_tools_metadata extracts run_params_schema from args_schema."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
from pydantic import BaseModel
class MyToolInput(BaseModel):
query: str
limit: int = 10
class MyTool(BaseTool):
name: str = "my_tool"
description: str = "A test tool"
args_schema: type[BaseModel] = MyToolInput
__all__ = ['MyTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 1
assert metadata[0]["name"] == "MyTool"
run_params = metadata[0]["run_params_schema"]
assert "properties" in run_params
assert "query" in run_params["properties"]
assert "limit" in run_params["properties"]
def test_extract_tools_metadata_with_env_vars(temp_project_dir):
"""Test that extract_tools_metadata extracts env_vars."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
from crewai.tools.base_tool import EnvVar
class MyTool(BaseTool):
name: str = "my_tool"
description: str = "A test tool"
env_vars: list[EnvVar] = [
EnvVar(name="MY_API_KEY", description="API key for service", required=True),
EnvVar(name="MY_OPTIONAL_VAR", description="Optional var", required=False, default="default_value"),
]
__all__ = ['MyTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 1
env_vars = metadata[0]["env_vars"]
assert len(env_vars) == 2
assert env_vars[0]["name"] == "MY_API_KEY"
assert env_vars[0]["description"] == "API key for service"
assert env_vars[0]["required"] is True
assert env_vars[1]["name"] == "MY_OPTIONAL_VAR"
assert env_vars[1]["required"] is False
assert env_vars[1]["default"] == "default_value"
def test_extract_tools_metadata_with_env_vars_field_default_factory(temp_project_dir):
"""Test that extract_tools_metadata extracts env_vars declared with Field(default_factory=...)."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
from crewai.tools.base_tool import EnvVar
from pydantic import Field
class MyTool(BaseTool):
name: str = "my_tool"
description: str = "A test tool"
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(name="MY_TOOL_API", description="API token for my tool", required=True),
]
)
__all__ = ['MyTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 1
env_vars = metadata[0]["env_vars"]
assert len(env_vars) == 1
assert env_vars[0]["name"] == "MY_TOOL_API"
assert env_vars[0]["description"] == "API token for my tool"
assert env_vars[0]["required"] is True
def test_extract_tools_metadata_with_custom_init_params(temp_project_dir):
"""Test that extract_tools_metadata extracts init_params_schema with custom params."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class MyTool(BaseTool):
name: str = "my_tool"
description: str = "A test tool"
api_endpoint: str = "https://api.example.com"
timeout: int = 30
__all__ = ['MyTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 1
init_params = metadata[0]["init_params_schema"]
assert "properties" in init_params
# Custom params should be included
assert "api_endpoint" in init_params["properties"]
assert "timeout" in init_params["properties"]
# Base params should be filtered out
assert "name" not in init_params["properties"]
assert "description" not in init_params["properties"]
def test_extract_tools_metadata_multiple_tools(temp_project_dir):
"""Test that extract_tools_metadata extracts metadata from multiple tools."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class FirstTool(BaseTool):
name: str = "first_tool"
description: str = "First test tool"
class SecondTool(BaseTool):
name: str = "second_tool"
description: str = "Second test tool"
__all__ = ['FirstTool', 'SecondTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 2
names = [m["name"] for m in metadata]
assert "FirstTool" in names
assert "SecondTool" in names
def test_extract_tools_metadata_multiple_init_files(temp_project_dir):
"""Test that extract_tools_metadata extracts metadata from multiple __init__.py files."""
# Create tool in root __init__.py
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class RootTool(BaseTool):
name: str = "root_tool"
description: str = "Root tool"
__all__ = ['RootTool']
""",
)
# Create nested package with another tool
nested_dir = temp_project_dir / "nested"
nested_dir.mkdir()
create_init_file(
nested_dir,
"""from crewai.tools import BaseTool
class NestedTool(BaseTool):
name: str = "nested_tool"
description: str = "Nested tool"
__all__ = ['NestedTool']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 2
names = [m["name"] for m in metadata]
assert "RootTool" in names
assert "NestedTool" in names
def test_extract_tools_metadata_ignores_non_tool_exports(temp_project_dir):
"""Test that extract_tools_metadata ignores non-BaseTool exports."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class MyTool(BaseTool):
name: str = "my_tool"
description: str = "A test tool"
def not_a_tool():
pass
SOME_CONSTANT = "value"
__all__ = ['MyTool', 'not_a_tool', 'SOME_CONSTANT']
""",
)
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert len(metadata) == 1
assert metadata[0]["name"] == "MyTool"
def test_extract_tools_metadata_import_error_returns_empty(temp_project_dir):
"""Test that extract_tools_metadata returns empty list on import error."""
create_init_file(
temp_project_dir,
"""from nonexistent_module import something
class MyTool(BaseTool):
pass
__all__ = ['MyTool']
""",
)
# Should not raise, just return empty list
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []
def test_extract_tools_metadata_syntax_error_returns_empty(temp_project_dir):
"""Test that extract_tools_metadata returns empty list on syntax error."""
create_init_file(
temp_project_dir,
"""from crewai.tools import BaseTool
class MyTool(BaseTool):
# Missing closing parenthesis
def __init__(self, name:
pass
__all__ = ['MyTool']
""",
)
# Should not raise, just return empty list
metadata = utils.extract_tools_metadata(dir_path=str(temp_project_dir))
assert metadata == []

View File

@@ -185,9 +185,14 @@ def test_publish_when_not_in_sync(mock_is_synced, capsys, tool_command):
"crewai.cli.tools.main.extract_available_exports",
return_value=[{"name": "SampleTool"}],
)
@patch(
"crewai.cli.tools.main.extract_tools_metadata",
return_value=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
@patch("crewai.cli.tools.main.ToolCommand._print_current_organization")
def test_publish_when_not_in_sync_and_force(
mock_print_org,
mock_tools_metadata,
mock_available_exports,
mock_is_synced,
mock_publish,
@@ -222,6 +227,7 @@ def test_publish_when_not_in_sync_and_force(
description="A sample tool",
encoded_file=unittest.mock.ANY,
available_exports=[{"name": "SampleTool"}],
tools_metadata=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
mock_print_org.assert_called_once()
@@ -242,7 +248,12 @@ def test_publish_when_not_in_sync_and_force(
"crewai.cli.tools.main.extract_available_exports",
return_value=[{"name": "SampleTool"}],
)
@patch(
"crewai.cli.tools.main.extract_tools_metadata",
return_value=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
def test_publish_success(
mock_tools_metadata,
mock_available_exports,
mock_is_synced,
mock_publish,
@@ -277,6 +288,7 @@ def test_publish_success(
description="A sample tool",
encoded_file=unittest.mock.ANY,
available_exports=[{"name": "SampleTool"}],
tools_metadata=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
@@ -295,7 +307,12 @@ def test_publish_success(
"crewai.cli.tools.main.extract_available_exports",
return_value=[{"name": "SampleTool"}],
)
@patch(
"crewai.cli.tools.main.extract_tools_metadata",
return_value=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
def test_publish_failure(
mock_tools_metadata,
mock_available_exports,
mock_publish,
mock_open,
@@ -336,7 +353,12 @@ def test_publish_failure(
"crewai.cli.tools.main.extract_available_exports",
return_value=[{"name": "SampleTool"}],
)
@patch(
"crewai.cli.tools.main.extract_tools_metadata",
return_value=[{"name": "SampleTool", "humanized_name": "sample_tool", "description": "A sample tool", "run_params_schema": {}, "init_params_schema": {}, "env_vars": []}],
)
def test_publish_api_error(
mock_tools_metadata,
mock_available_exports,
mock_publish,
mock_open,
@@ -362,6 +384,63 @@ def test_publish_api_error(
mock_publish.assert_called_once()
@patch("crewai.cli.tools.main.get_project_name", return_value="sample-tool")
@patch("crewai.cli.tools.main.get_project_version", return_value="1.0.0")
@patch("crewai.cli.tools.main.get_project_description", return_value="A sample tool")
@patch("crewai.cli.tools.main.subprocess.run")
@patch("crewai.cli.tools.main.os.listdir", return_value=["sample-tool-1.0.0.tar.gz"])
@patch(
"crewai.cli.tools.main.open",
new_callable=unittest.mock.mock_open,
read_data=b"sample tarball content",
)
@patch("crewai.cli.plus_api.PlusAPI.publish_tool")
@patch("crewai.cli.tools.main.git.Repository.is_synced", return_value=True)
@patch(
"crewai.cli.tools.main.extract_available_exports",
return_value=[{"name": "SampleTool"}],
)
@patch(
"crewai.cli.tools.main.extract_tools_metadata",
side_effect=Exception("Failed to extract metadata"),
)
def test_publish_metadata_extraction_failure_continues_with_warning(
mock_tools_metadata,
mock_available_exports,
mock_is_synced,
mock_publish,
mock_open,
mock_listdir,
mock_subprocess_run,
mock_get_project_description,
mock_get_project_version,
mock_get_project_name,
capsys,
tool_command,
):
"""Test that metadata extraction failure shows warning but continues publishing."""
mock_publish_response = MagicMock()
mock_publish_response.status_code = 200
mock_publish_response.json.return_value = {"handle": "sample-tool"}
mock_publish.return_value = mock_publish_response
tool_command.publish(is_public=True)
output = capsys.readouterr().out
assert "Warning: Could not extract tool metadata" in output
assert "Publishing will continue without detailed metadata" in output
assert "No tool metadata extracted" in output
mock_publish.assert_called_once_with(
handle="sample-tool",
is_public=True,
version="1.0.0",
description="A sample tool",
encoded_file=unittest.mock.ANY,
available_exports=[{"name": "SampleTool"}],
tools_metadata=[],
)
@patch("crewai.cli.tools.main.Settings")
def test_print_current_organization_with_org(mock_settings, capsys, tool_command):
mock_settings_instance = MagicMock()

View File

@@ -1523,6 +1523,69 @@ def test_openai_stop_words_not_applied_to_structured_output():
assert "Observation:" in result.observation
def test_openai_gpt5_models_do_not_support_stop_words():
"""
Test that GPT-5 family models do not support stop words via the API.
GPT-5 models reject the 'stop' parameter, so stop words must be
applied client-side only.
"""
gpt5_models = [
"gpt-5",
"gpt-5-mini",
"gpt-5-nano",
"gpt-5-pro",
"gpt-5.1",
"gpt-5.1-chat",
"gpt-5.2",
"gpt-5.2-chat",
]
for model_name in gpt5_models:
llm = OpenAICompletion(model=model_name)
assert llm.supports_stop_words() == False, (
f"Expected {model_name} to NOT support stop words"
)
def test_openai_non_gpt5_models_support_stop_words():
"""
Test that non-GPT-5 models still support stop words normally.
"""
supported_models = [
"gpt-4o",
"gpt-4o-mini",
"gpt-4.1",
"gpt-4.1-mini",
"gpt-4-turbo",
]
for model_name in supported_models:
llm = OpenAICompletion(model=model_name)
assert llm.supports_stop_words() == True, (
f"Expected {model_name} to support stop words"
)
def test_openai_gpt5_still_applies_stop_words_client_side():
"""
Test that GPT-5 models still truncate responses at stop words client-side
via _apply_stop_words(), even though they don't send 'stop' to the API.
"""
llm = OpenAICompletion(
model="gpt-5.2",
stop=["Observation:", "Final Answer:"],
)
assert llm.supports_stop_words() == False
response = "I need to search.\n\nAction: search\nObservation: Found results"
result = llm._apply_stop_words(response)
assert "Observation:" not in result
assert "Found results" not in result
assert "I need to search" in result
def test_openai_stop_words_still_applied_to_regular_responses():
"""
Test that stop words ARE still applied for regular (non-structured) responses.

View File

@@ -682,6 +682,126 @@ def test_llm_call_when_stop_is_unsupported_when_additional_drop_params_is_provid
assert "Paris" in result
@pytest.mark.vcr()
def test_litellm_gpt5_call_succeeds_without_stop_error():
"""
Integration test: GPT-5 call succeeds when stop words are configured,
because stop is omitted from API params and applied client-side.
"""
llm = LLM(model="gpt-5", stop=["Observation:"], is_litellm=True)
result = llm.call("What is the capital of France?")
assert isinstance(result, str)
assert len(result) > 0
def test_litellm_gpt5_does_not_send_stop_in_params():
"""
Test that the LiteLLM fallback path does not include 'stop' in API params
for GPT-5.x models, since they reject it at the API level.
"""
llm = LLM(model="openai/gpt-5.2", stop=["Observation:"], is_litellm=True)
params = llm._prepare_completion_params(
messages=[{"role": "user", "content": "Hello"}]
)
assert params.get("stop") is None, (
"GPT-5.x models should not have 'stop' in API params"
)
def test_litellm_non_gpt5_sends_stop_in_params():
"""
Test that the LiteLLM fallback path still includes 'stop' in API params
for models that support it.
"""
llm = LLM(model="gpt-4o", stop=["Observation:"], is_litellm=True)
params = llm._prepare_completion_params(
messages=[{"role": "user", "content": "Hello"}]
)
assert params.get("stop") == ["Observation:"], (
"Non-GPT-5 models should have 'stop' in API params"
)
def test_litellm_retry_catches_litellm_unsupported_params_error(caplog):
"""
Test that the retry logic catches LiteLLM's UnsupportedParamsError format
("does not support parameters") in addition to the OpenAI API format.
"""
llm = LLM(model="openai/gpt-5.2", stop=["Observation:"], is_litellm=True)
litellm_error = Exception(
"litellm.UnsupportedParamsError: openai does not support parameters: "
"['stop'], for model=openai/gpt-5.2."
)
call_count = 0
try:
import litellm
except ImportError:
pytest.skip("litellm is not installed; skipping LiteLLM retry test")
def mock_completion(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
raise litellm_error
return MagicMock(
choices=[MagicMock(message=MagicMock(content="Paris", tool_calls=None))],
usage=MagicMock(
prompt_tokens=10,
completion_tokens=5,
total_tokens=15,
),
)
with patch("litellm.completion", side_effect=mock_completion):
with caplog.at_level(logging.INFO):
result = llm.call("What is the capital of France?")
assert "Retrying LLM call without the unsupported 'stop'" in caplog.text
assert "stop" in llm.additional_params.get("additional_drop_params", [])
def test_litellm_retry_catches_openai_api_stop_error(caplog):
"""
Test that the retry logic still catches the OpenAI API error format
("Unsupported parameter: 'stop'").
"""
llm = LLM(model="openai/gpt-5.2", stop=["Observation:"], is_litellm=True)
api_error = Exception(
"Unsupported parameter: 'stop' is not supported with this model."
)
call_count = 0
def mock_completion(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
raise api_error
return MagicMock(
choices=[MagicMock(message=MagicMock(content="Paris", tool_calls=None))],
usage=MagicMock(
prompt_tokens=10,
completion_tokens=5,
total_tokens=15,
),
)
with patch("litellm.completion", side_effect=mock_completion):
with caplog.at_level(logging.INFO):
llm.call("What is the capital of France?")
assert "Retrying LLM call without the unsupported 'stop'" in caplog.text
assert "stop" in llm.additional_params.get("additional_drop_params", [])
@pytest.fixture
def ollama_llm():
return LLM(model="ollama/llama3.2:3b", is_litellm=True)

View File

@@ -0,0 +1,586 @@
"""Tests for trace serialization optimization to prevent trace table bloat.
These tests verify that trace events don't contain redundant full crew/task/agent
objects, reducing event sizes from 50-100KB to a few KB per event.
"""
import uuid
from unittest.mock import MagicMock
import pytest
from crewai.events.listeners.tracing.trace_listener import (
TRACE_EXCLUDE_FIELDS,
TraceCollectionListener,
_serialize_for_trace,
)
class TestTraceExcludeFields:
"""Test that TRACE_EXCLUDE_FIELDS contains all the heavy/redundant fields."""
def test_contains_back_references(self):
"""Verify back-reference fields are excluded."""
back_refs = {"crew", "agent", "agents", "tasks", "context"}
assert back_refs.issubset(TRACE_EXCLUDE_FIELDS)
def test_contains_heavy_fields(self):
"""Verify heavy objects are excluded.
Note: 'tools' is NOT in TRACE_EXCLUDE_FIELDS because LLMCallStartedEvent.tools
is a lightweight list of tool schemas. Agent.tools exclusion is handled
explicitly in _build_crew_started_data.
"""
heavy_fields = {
"llm",
"function_calling_llm",
"step_callback",
"task_callback",
"crew_callback",
"callbacks",
"_memory",
"_cache",
"knowledge_sources",
}
assert heavy_fields.issubset(TRACE_EXCLUDE_FIELDS)
# tools is NOT excluded globally - LLM events need it
assert "tools" not in TRACE_EXCLUDE_FIELDS
class TestSerializeForTrace:
"""Test the _serialize_for_trace helper function."""
def test_excludes_crew_field(self):
"""Verify crew field is excluded from serialization."""
event = MagicMock()
event.crew = MagicMock(name="TestCrew")
event.crew_name = "TestCrew"
event.timestamp = None
result = _serialize_for_trace(event)
# crew_name should be present (scalar field)
# crew should be excluded (back-reference)
assert "crew" not in result or result.get("crew") is None
def test_excludes_agent_field(self):
"""Verify agent field is excluded from serialization."""
event = MagicMock()
event.agent = MagicMock(role="TestAgent")
event.agent_role = "TestAgent"
result = _serialize_for_trace(event)
assert "agent" not in result or result.get("agent") is None
def test_preserves_tools_field(self):
"""Verify tools field is preserved for LLM events (lightweight schemas)."""
class EventWithTools:
def __init__(self):
self.tools = [{"name": "search", "description": "Search tool"}]
self.tool_name = "test_tool"
event = EventWithTools()
result = _serialize_for_trace(event)
# tools should be preserved (lightweight for LLM events)
assert "tools" in result
assert result["tools"] == [{"name": "search", "description": "Search tool"}]
def test_preserves_scalar_fields(self):
"""Verify scalar fields needed by AMP frontend are preserved."""
class SimpleEvent:
def __init__(self):
self.agent_role = "Researcher"
self.task_name = "Research Task"
self.task_id = str(uuid.uuid4())
self.duration_ms = 1500
self.tokens_used = 500
event = SimpleEvent()
result = _serialize_for_trace(event)
# Scalar fields should be preserved
assert result.get("agent_role") == "Researcher"
assert result.get("task_name") == "Research Task"
assert result.get("duration_ms") == 1500
assert result.get("tokens_used") == 500
def test_extra_exclude_parameter(self):
"""Verify extra_exclude adds to the default exclusions."""
class EventWithCustomField:
def __init__(self):
self.custom_heavy_field = {"large": "data" * 1000}
self.keep_this = "small"
event = EventWithCustomField()
result = _serialize_for_trace(event, extra_exclude={"custom_heavy_field"})
assert "custom_heavy_field" not in result
assert result.get("keep_this") == "small"
class TestBuildEventData:
"""Test _build_event_data method for different event types."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
# Reset singleton
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_task_started_no_full_task_object(self, listener):
"""Verify task_started event doesn't include full task object."""
mock_task = MagicMock()
mock_task.name = "Test Task"
mock_task.description = "A test task description"
mock_task.expected_output = "Expected result"
mock_task.id = uuid.uuid4()
# Add heavy fields that should NOT appear in output
mock_task.crew = MagicMock(name="HeavyCrew")
mock_task.agent = MagicMock(role="HeavyAgent")
mock_task.context = [MagicMock(), MagicMock()]
mock_task.tools = [MagicMock(), MagicMock()]
mock_event = MagicMock()
mock_event.task = mock_task
mock_event.context = "test context"
mock_source = MagicMock()
mock_source.agent = MagicMock()
mock_source.agent.role = "Worker"
result = listener._build_event_data("task_started", mock_event, mock_source)
# Should have scalar fields
assert result["task_name"] == "Test Task"
assert result["task_description"] == "A test task description"
assert result["agent_role"] == "Worker"
assert result["task_id"] == str(mock_task.id)
# Should NOT have full objects
assert "crew" not in result
assert "tools" not in result
# task and agent should not be full objects
assert result.get("task") is None or not hasattr(result.get("task"), "crew")
def test_task_completed_no_full_task_object(self, listener):
"""Verify task_completed event doesn't include full task object."""
mock_task = MagicMock()
mock_task.name = "Completed Task"
mock_task.description = "Task description"
mock_task.id = uuid.uuid4()
mock_output = MagicMock()
mock_output.raw = "Task result"
mock_output.output_format = "text"
mock_output.agent = "Worker"
mock_event = MagicMock()
mock_event.task = mock_task
mock_event.output = mock_output
result = listener._build_event_data("task_completed", mock_event, None)
# Should have scalar fields
assert result["task_name"] == "Completed Task"
assert result["output_raw"] == "Task result"
assert result["agent_role"] == "Worker"
# Should NOT have full task object
assert "crew" not in result
assert "tools" not in result
def test_agent_execution_started_no_full_agent(self, listener):
"""Verify agent_execution_started extracts only scalar fields."""
mock_agent = MagicMock()
mock_agent.role = "Analyst"
mock_agent.goal = "Analyze data"
mock_agent.backstory = "Expert analyst"
# Heavy fields
mock_agent.tools = [MagicMock(), MagicMock()]
mock_agent.llm = MagicMock()
mock_agent.crew = MagicMock()
mock_event = MagicMock()
mock_event.agent = mock_agent
result = listener._build_event_data(
"agent_execution_started", mock_event, None
)
# Should have scalar fields
assert result["agent_role"] == "Analyst"
assert result["agent_goal"] == "Analyze data"
assert result["agent_backstory"] == "Expert analyst"
# Should NOT have heavy objects
assert "tools" not in result
assert "llm" not in result
assert "crew" not in result
def test_llm_call_started_excludes_heavy_fields(self, listener):
"""Verify llm_call_started uses lightweight serialization.
LLMCallStartedEvent.tools is a lightweight list of tool schemas (dicts),
not heavy Agent.tools objects, so it should be preserved.
"""
class MockLLMEvent:
def __init__(self):
self.task_name = "LLM Task"
self.model = "gpt-4"
self.tokens = 100
# Heavy fields that should be excluded
self.crew = MagicMock()
self.agent = MagicMock()
# LLM event tools are lightweight schemas (dicts), should be kept
self.tools = [{"name": "search", "description": "Search tool"}]
mock_event = MockLLMEvent()
result = listener._build_event_data("llm_call_started", mock_event, None)
# task_name should be present
assert result["task_name"] == "LLM Task"
# Heavy fields should be excluded
assert "crew" not in result or result.get("crew") is None
assert "agent" not in result or result.get("agent") is None
# LLM tools (lightweight schemas) should be preserved
assert result.get("tools") == [{"name": "search", "description": "Search tool"}]
def test_llm_call_completed_excludes_heavy_fields(self, listener):
"""Verify llm_call_completed uses lightweight serialization."""
class MockLLMCompletedEvent:
def __init__(self):
self.response = "LLM response"
self.tokens_used = 150
self.duration_ms = 500
# Heavy fields
self.crew = MagicMock()
self.agent = MagicMock()
mock_event = MockLLMCompletedEvent()
result = listener._build_event_data("llm_call_completed", mock_event, None)
# Scalar fields preserved
assert result.get("response") == "LLM response"
assert result.get("tokens_used") == 150
# Heavy fields excluded
assert "crew" not in result or result.get("crew") is None
assert "agent" not in result or result.get("agent") is None
class TestCrewKickoffStartedEvent:
"""Test that crew_kickoff_started event has full structure."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_crew_started_has_crew_structure(self, listener):
"""Verify crew_kickoff_started includes the crew_structure field."""
# Create mock crew with agents and tasks
mock_agent1 = MagicMock()
mock_agent1.id = uuid.uuid4()
mock_agent1.role = "Researcher"
mock_agent1.goal = "Research things"
mock_agent1.backstory = "Expert researcher"
mock_agent1.verbose = True
mock_agent1.allow_delegation = False
mock_agent1.max_iter = 10
mock_agent1.max_rpm = None
mock_agent1.tools = [MagicMock(name="search_tool"), MagicMock(name="read_tool")]
mock_agent2 = MagicMock()
mock_agent2.id = uuid.uuid4()
mock_agent2.role = "Writer"
mock_agent2.goal = "Write content"
mock_agent2.backstory = "Expert writer"
mock_agent2.verbose = False
mock_agent2.allow_delegation = True
mock_agent2.max_iter = 5
mock_agent2.max_rpm = 10
mock_agent2.tools = []
mock_task1 = MagicMock()
mock_task1.id = uuid.uuid4()
mock_task1.name = "Research Task"
mock_task1.description = "Do research"
mock_task1.expected_output = "Research results"
mock_task1.async_execution = False
mock_task1.human_input = False
mock_task1.agent = mock_agent1
mock_task1.context = None
mock_task2 = MagicMock()
mock_task2.id = uuid.uuid4()
mock_task2.name = "Writing Task"
mock_task2.description = "Write report"
mock_task2.expected_output = "Written report"
mock_task2.async_execution = True
mock_task2.human_input = True
mock_task2.agent = mock_agent2
mock_task2.context = [mock_task1]
mock_crew = MagicMock()
mock_crew.agents = [mock_agent1, mock_agent2]
mock_crew.tasks = [mock_task1, mock_task2]
mock_crew.process = "sequential"
mock_crew.verbose = True
mock_crew.memory = False
mock_event = MagicMock()
mock_event.crew = mock_crew
mock_event.crew_name = "TestCrew"
mock_event.inputs = {"key": "value"}
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
# Should have crew_structure
assert "crew_structure" in result
crew_structure = result["crew_structure"]
# Verify agents are serialized with tool names
assert len(crew_structure["agents"]) == 2
agent1_data = crew_structure["agents"][0]
assert agent1_data["role"] == "Researcher"
assert agent1_data["goal"] == "Research things"
assert "tool_names" in agent1_data
assert len(agent1_data["tool_names"]) == 2
# Verify tasks have lightweight agent references
assert len(crew_structure["tasks"]) == 2
task2_data = crew_structure["tasks"][1]
assert task2_data["name"] == "Writing Task"
assert "agent_ref" in task2_data
assert task2_data["agent_ref"]["role"] == "Writer"
# Verify context uses task IDs
assert "context_task_ids" in task2_data
assert str(mock_task1.id) in task2_data["context_task_ids"]
def test_crew_started_agents_no_full_tools(self, listener):
"""Verify agents in crew_structure have tool_names, not full tool objects."""
mock_tool = MagicMock()
mock_tool.name = "web_search"
mock_tool.description = "Search the web"
mock_tool.func = lambda x: x # Heavy callable
mock_tool.args_schema = {"type": "object"} # Schema
mock_agent = MagicMock()
mock_agent.id = uuid.uuid4()
mock_agent.role = "Searcher"
mock_agent.goal = "Search"
mock_agent.backstory = "Expert"
mock_agent.verbose = False
mock_agent.allow_delegation = False
mock_agent.max_iter = 5
mock_agent.max_rpm = None
mock_agent.tools = [mock_tool]
mock_crew = MagicMock()
mock_crew.agents = [mock_agent]
mock_crew.tasks = []
mock_crew.process = "sequential"
mock_crew.verbose = False
mock_crew.memory = False
mock_event = MagicMock()
mock_event.crew = mock_crew
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
agent_data = result["crew_structure"]["agents"][0]
# Should have tool_names (list of strings)
assert "tool_names" in agent_data
assert agent_data["tool_names"] == ["web_search"]
# Should NOT have full tools array
assert "tools" not in agent_data
def test_crew_started_tasks_no_full_agent(self, listener):
"""Verify tasks have agent_ref, not full agent object."""
mock_agent = MagicMock()
mock_agent.id = uuid.uuid4()
mock_agent.role = "Worker"
mock_agent.goal = "Work hard"
mock_agent.backstory = "Dedicated worker"
mock_agent.tools = [MagicMock(), MagicMock()]
mock_agent.llm = MagicMock()
mock_task = MagicMock()
mock_task.id = uuid.uuid4()
mock_task.name = "Work Task"
mock_task.description = "Do work"
mock_task.expected_output = "Work done"
mock_task.async_execution = False
mock_task.human_input = False
mock_task.agent = mock_agent
mock_task.context = None
mock_crew = MagicMock()
mock_crew.agents = [mock_agent]
mock_crew.tasks = [mock_task]
mock_crew.process = "sequential"
mock_crew.verbose = False
mock_crew.memory = False
mock_event = MagicMock()
mock_event.crew = mock_crew
result = listener._build_event_data("crew_kickoff_started", mock_event, None)
task_data = result["crew_structure"]["tasks"][0]
# Should have lightweight agent_ref
assert "agent_ref" in task_data
assert task_data["agent_ref"]["id"] == str(mock_agent.id)
assert task_data["agent_ref"]["role"] == "Worker"
# agent_ref should ONLY have id and role (not tools, llm, etc.)
assert len(task_data["agent_ref"]) == 2
# Should NOT have full agent
assert "agent" not in task_data
class TestNonCrewStartedEvents:
"""Test that non-crew_started events don't have redundant data."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_generic_event_no_crew(self, listener):
"""Verify generic events exclude crew object.
Note: 'tools' is now preserved since LLMCallStartedEvent.tools is lightweight.
"""
class GenericEvent:
def __init__(self):
self.event_type = "some_event"
self.data = "some_data"
# These should be excluded
self.crew = MagicMock()
self.agents = [MagicMock()]
self.tasks = [MagicMock()]
# tools is now preserved (for LLM events it's lightweight)
self.tools = [{"name": "search"}]
mock_event = GenericEvent()
result = listener._build_event_data("some_event", mock_event, None)
# Scalar fields preserved
assert result.get("event_type") == "some_event"
assert result.get("data") == "some_data"
# Heavy fields excluded
assert "crew" not in result or result.get("crew") is None
assert "agents" not in result or result.get("agents") is None
assert "tasks" not in result or result.get("tasks") is None
# tools is now preserved (lightweight for LLM events)
assert result.get("tools") == [{"name": "search"}]
def test_crew_kickoff_completed_no_full_crew(self, listener):
"""Verify crew_kickoff_completed doesn't repeat full crew structure."""
class CrewCompletedEvent:
def __init__(self):
self.crew_name = "TestCrew"
self.total_tokens = 5000
self.output = "Final output"
# Should be excluded
self.crew = MagicMock()
self.crew.agents = [MagicMock(), MagicMock()]
self.crew.tasks = [MagicMock()]
mock_event = CrewCompletedEvent()
result = listener._build_event_data("crew_kickoff_completed", mock_event, None)
# Scalar fields preserved
assert result.get("crew_name") == "TestCrew"
assert result.get("total_tokens") == 5000
# Should NOT have full crew object
assert "crew" not in result or result.get("crew") is None
# Should NOT have crew_structure (that's only for crew_started)
assert "crew_structure" not in result
class TestSizeReduction:
"""Test that the optimization actually reduces serialized size."""
@pytest.fixture
def listener(self):
"""Create a trace listener for testing."""
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
TraceCollectionListener._listeners_setup = False
return TraceCollectionListener()
def test_task_event_size_reduction(self, listener):
"""Verify task events are much smaller than naive serialization."""
import json
# Create a realistic task with many fields
mock_agent = MagicMock()
mock_agent.id = uuid.uuid4()
mock_agent.role = "Researcher"
mock_agent.goal = "Research" * 50 # Longer goal
mock_agent.backstory = "Expert" * 100 # Longer backstory
mock_agent.tools = [MagicMock() for _ in range(5)]
mock_agent.llm = MagicMock()
mock_agent.crew = MagicMock()
mock_task = MagicMock()
mock_task.name = "Research Task"
mock_task.description = "Detailed description" * 20
mock_task.expected_output = "Expected" * 10
mock_task.id = uuid.uuid4()
mock_task.agent = mock_agent
mock_task.context = [MagicMock() for _ in range(3)]
mock_task.crew = MagicMock()
mock_task.tools = [MagicMock() for _ in range(3)]
mock_event = MagicMock()
mock_event.task = mock_task
mock_event.context = "test context"
mock_source = MagicMock()
mock_source.agent = mock_agent
result = listener._build_event_data("task_started", mock_event, mock_source)
# The result should be relatively small
serialized = json.dumps(result, default=str)
# Should be under 2KB for task_started (was potentially 50-100KB before)
assert len(serialized) < 2000, f"task_started too large: {len(serialized)} bytes"
# Should have the essential fields
assert "task_name" in result
assert "task_id" in result
assert "agent_role" in result