Compare commits

...

9 Commits

Author SHA1 Message Date
Matt Aitchison
0fba8163ca docs: note FileArtifact handles are run-scoped and don't survive across runs 2026-06-04 20:38:52 -05:00
Matt Aitchison
ad798afeca fix: key artifact dedup map by (object, scope) and clear it in tests
Keying the object-identity dedup map on id(artifact) alone could orphan a
mapping if the same instance were ever stored under two scopes (the second
store overwrote the first's entry, and per-handle cleanup then skipped it).
Key on (id(artifact), scope) so each scope keeps its own handle and cleanup is
exact and unconditional. Also clear _handle_by_obj in the test fixture so stale
id->handle mappings don't accumulate across the session.
2026-06-04 20:33:08 -05:00
Matt Aitchison
8d2ca5ef4c fix: store FileArtifact returned by after_tool_call hooks
store_if_artifact ran before the after_tool_call hooks, so a hook that
replaced the result with a FileArtifact put raw bytes / a dataclass repr
into the tool message and events. Re-run store_if_artifact on the final
result after the hook loop in all three native tool paths (no-op for the
normal string case).
2026-06-04 20:06:41 -05:00
Matt Aitchison
000dd41fc3 fix: dedupe artifact storage on repeated cached results
The tool-result cache stores the raw FileArtifact and hands back the same
instance on every cache hit, where store_if_artifact ran again and minted a
fresh handle plus another byte copy. Repeated identical cached calls therefore
stacked duplicate copies of the same payload until scope/TTL cleanup. The store
now reuses an existing handle when the same FileArtifact instance is stored
again under the same scope, keying off object identity (the cache keeps the
object alive, so the id is stable within a run).
2026-06-04 19:54:49 -05:00
Matt Aitchison
b827f7ee11 fix: wire planning-mode native path and harden artifact helpers
- Wire resolve_artifact_handles/store_if_artifact into
  agent_utils.execute_single_native_tool_call, the native tool path used
  by StepExecutor (Plan-and-Execute mode); previously a FileArtifact
  produced or consumed there bypassed the bypass and leaked raw bytes.
- Fold the crew/agent.crew fallback into artifact_scope_id(crew, task,
  agent) so all four execution paths derive the cleanup scope identically
  instead of repeating 'self.crew or getattr(self.agent, ...)'.
- Sanitize ']' and newlines (not just quotes) in the placeholder so an
  odd filename can't break the bracketed attribute line; extend
  _human_size with TB/PB.
2026-06-04 19:46:24 -05:00
Matt Aitchison
db12082ad8 fix: enforce per-entry artifact TTL and align cleanup scope
- Store each artifact's own expiry (expires_at) instead of a shared
  stored_at; prune by per-entry expiry on store and enforce it on
  resolve. Previously a short-TTL store could evict long-TTL entries and
  expired handles stayed resolvable until the next write.
- Derive the artifact scope from the executor's crew or the agent's crew
  so the native and ReAct paths agree with the crew-id Crew passes to
  clear_artifact_scope, preventing crew-bound artifacts from lingering
  under a task scope until TTL.
2026-06-04 19:22:55 -05:00
Matt Aitchison
06b239d8fe fix: resolve file-artifact handles with uppercased uuid hex
The handle regex matches hex case-insensitively, but store keys are
lowercase uuid4 strings. Normalize the captured uuid to lowercase before
lookup so a handle echoed by the model with uppercase hex still resolves
to its bytes instead of leaking the raw token to the downstream tool.
2026-06-04 19:14:56 -05:00
Matt Aitchison
f7667c1f12 style: apply ruff format to file_artifact and tool_usage 2026-06-04 19:12:05 -05:00
Matt Aitchison
f300c1f2a6 fix: add FileArtifact to keep binary tool I/O out of the LLM context
Binary file data (PPTX, PDF, images) returned by one tool and echoed by
the model as a base64 argument to another tool drifts by a few characters,
invalidating the base64 and corrupting the resulting file.

Tools can now return a FileArtifact instead of a base64 string. The agent
executor stores the bytes out-of-band (execution-scoped, TTL-backed) and
shows the model a short, namespaced `crewai+file://` handle, expanding it
back to exact base64 just before the consuming tool runs. Wired into the
native (default AgentExecutor + legacy) and ReAct execution paths with
per-run cleanup.
2026-06-04 19:07:52 -05:00
8 changed files with 761 additions and 7 deletions

View File

@@ -863,6 +863,13 @@ class CrewAgentExecutor(BaseAgentExecutor):
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.tools.file_artifact import (
artifact_scope_id,
resolve_artifact_handles,
store_if_artifact,
)
scope_id = artifact_scope_id(self.crew, self.task, self.agent)
args_dict, parse_error = parse_tool_call_args(
func_args, func_name, call_id, original_tool
@@ -896,6 +903,7 @@ class CrewAgentExecutor(BaseAgentExecutor):
tool=func_name, input=input_str
)
if cached_result is not None:
cached_result = store_if_artifact(cached_result, scope_id)
result = (
str(cached_result)
if not isinstance(cached_result, str)
@@ -960,7 +968,8 @@ class CrewAgentExecutor(BaseAgentExecutor):
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
elif not from_cache and func_name in available_functions:
try:
raw_result = available_functions[func_name](**(args_dict or {}))
invoke_args = resolve_artifact_handles(args_dict) if args_dict else {}
raw_result = available_functions[func_name](**invoke_args)
if self.tools_handler and self.tools_handler.cache:
should_cache = True
@@ -977,6 +986,7 @@ class CrewAgentExecutor(BaseAgentExecutor):
tool=func_name, input=input_str, output=raw_result
)
raw_result = store_if_artifact(raw_result, scope_id)
result = (
str(raw_result) if not isinstance(raw_result, str) else raw_result
)
@@ -1020,6 +1030,10 @@ class CrewAgentExecutor(BaseAgentExecutor):
color="red",
)
# An after_tool_call hook may have replaced the result with a
# FileArtifact; keep those bytes out of the message and events too.
result = store_if_artifact(result, scope_id)
if not error_event_emitted:
crewai_event_bus.emit(
self,

View File

@@ -116,6 +116,7 @@ from crewai.tasks.task_output import TaskOutput
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.agent_tools.read_file_tool import ReadFileTool
from crewai.tools.base_tool import BaseTool
from crewai.tools.file_artifact import clear_artifact_scope
from crewai.types.callback import SerializableCallable
from crewai.types.streaming import CrewStreamingOutput
from crewai.types.usage_metrics import UsageMetrics
@@ -1047,6 +1048,7 @@ class Crew(FlowTrackable, BaseModel):
if self._memory is not None and hasattr(self._memory, "drain_writes"):
self._memory.drain_writes()
clear_files(self.id)
clear_artifact_scope(self.id)
detach(token)
def _post_kickoff(self, result: CrewOutput) -> CrewOutput:
@@ -1255,6 +1257,7 @@ class Crew(FlowTrackable, BaseModel):
raise
finally:
clear_files(self.id)
clear_artifact_scope(self.id)
detach(token)
async def akickoff_for_each(

View File

@@ -70,6 +70,11 @@ from crewai.hooks.types import (
BeforeLLMCallHookType,
)
from crewai.tools.base_tool import BaseTool
from crewai.tools.file_artifact import (
artifact_scope_id,
resolve_artifact_handles,
store_if_artifact,
)
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.agent_utils import (
_llm_stop_words_applied,
@@ -1762,6 +1767,8 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
return parse_error
args_dict: dict[str, Any] = parsed_args or {}
scope_id = artifact_scope_id(self.crew, self.task, self.agent)
# Get agent_key for event tracking
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
@@ -1794,6 +1801,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
tool=func_name, input=input_str
)
if cached_result is not None:
cached_result = store_if_artifact(cached_result, scope_id)
result = (
str(cached_result)
if not isinstance(cached_result, str)
@@ -1859,7 +1867,10 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
if func_name in self._available_functions:
try:
tool_func = self._available_functions[func_name]
raw_result = tool_func(**args_dict)
invoke_args = (
resolve_artifact_handles(args_dict) if args_dict else {}
)
raw_result = tool_func(**invoke_args)
# Add to cache after successful execution (before string conversion)
if self.tools_handler and self.tools_handler.cache:
@@ -1874,6 +1885,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
)
# Convert to string for message
raw_result = store_if_artifact(raw_result, scope_id)
result = (
str(raw_result)
if not isinstance(raw_result, str)
@@ -1927,6 +1939,10 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
color="red",
)
# An after_tool_call hook may have replaced the result with a
# FileArtifact; keep those bytes out of the message and events too.
result = store_if_artifact(result, scope_id)
if not error_event_emitted:
crewai_event_bus.emit(
self,

View File

@@ -1,8 +1,10 @@
from crewai.tools.base_tool import BaseTool, EnvVar, tool
from crewai.tools.file_artifact import FileArtifact
__all__ = [
"BaseTool",
"EnvVar",
"FileArtifact",
"tool",
]

View File

@@ -0,0 +1,296 @@
"""Out-of-band binary file passing between tools.
LLMs cannot reproduce opaque strings longer than a few kilobytes byte-perfect.
A base64-encoded binary file (PPTX, PDF, image, ...) returned by one tool and
echoed by the model as the argument to another tool drifts by a few characters,
which invalidates the base64 and corrupts the resulting file.
To avoid routing bytes through the model, a tool returns a :class:`FileArtifact`
instead of a base64 string. The agent executor stores the bytes here and shows
the model a short, stable ``crewai+file://<uuid>`` handle in place of the data.
When the model passes that handle as an argument to a later tool, the executor
expands it back to base64 *just before* the tool runs -- the bytes never enter
the model's context, so they cannot be corrupted.
The handle is namespaced (``crewai+file://``) so resolution only ever fires on
tokens this module minted, never on arbitrary user data. Stored bytes are scoped
to a crew/task execution id and cleared when that execution finishes; a TTL prune
is the safety net for runs that never call :func:`clear_artifact_scope`.
Limitation: handles are ephemeral and scoped to a single run. A handle only
resolves while its run's artifacts are live. If a placeholder's text is persisted
(conversation memory, a checkpoint) and a *later* run echoes that handle, it will
no longer resolve and the literal token is passed through unchanged -- so binary
producer->consumer chains must complete within one run.
"""
from __future__ import annotations
import base64
from dataclasses import dataclass
import re
import threading
import time
from typing import Any, Final
from uuid import uuid4
__all__ = [
"FileArtifact",
"artifact_scope_id",
"clear_artifact_scope",
"resolve_artifact_handles",
"store_artifact",
"store_if_artifact",
]
_HANDLE_SCHEME: Final[str] = "crewai+file"
# A minted handle: crewai+file://<uuid4>. Matched case-insensitively because
# uuid hex may arrive upper- or lower-cased after a model round-trip.
_HANDLE_RE: Final[re.Pattern[str]] = re.compile(
r"crewai\+file://([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-"
r"[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
)
DEFAULT_ARTIFACT_TTL: Final[int] = 3600
@dataclass
class FileArtifact:
"""Binary file produced or consumed by a tool, kept out of the LLM context.
Return this from a tool's ``_run`` instead of a base64 string when the output
is binary. The executor stores the bytes and substitutes a short handle in the
text the model sees, so the model never has to reproduce the data verbatim.
Attributes:
data: Raw file bytes.
filename: Human-readable name, surfaced to the model and useful as a
default for downstream ``file_name`` arguments.
mime_type: MIME type of the content.
"""
data: bytes
filename: str = "file"
mime_type: str = "application/octet-stream"
@property
def size_bytes(self) -> int:
return len(self.data)
def as_base64(self) -> str:
"""Return the bytes as an ASCII base64 string (what connectors expect)."""
return base64.b64encode(self.data).decode("ascii")
def _placeholder(self, handle: str) -> str:
"""Build the model-facing text that stands in for the bytes."""
# Neutralize characters that would break the single-line bracketed
# attribute list (quotes, the closing bracket, newlines).
filename = _sanitize_attr(self.filename)
mime_type = _sanitize_attr(self.mime_type)
return (
f'[FileArtifact filename="{filename}" '
f'mime_type="{mime_type}" size={_human_size(self.size_bytes)} '
f"handle={handle}]\n"
"The binary content is stored out-of-band to keep it from being "
"corrupted in transit. To use this file, pass the handle string "
f"({handle}) as the value of the content/file argument when calling "
"another tool -- it is expanded to the real data before that tool runs."
)
@dataclass
class _Entry:
artifact: FileArtifact
scope_id: str | None
expires_at: float | None
obj_id: int
class _ArtifactStore:
"""Process-local, execution-scoped store keyed by minted handle id.
Entries are keyed by an opaque uuid (never by user-supplied content), so
concurrent crews cannot collide. Cleanup is per-scope -- clearing one crew's
artifacts never touches another's -- with a TTL prune as a backstop.
Storing the same :class:`FileArtifact` instance again under the same scope
reuses its handle rather than minting a duplicate. The tool-result cache
hands back the same object on every cache hit, so this keeps repeated cached
calls from stacking identical byte copies in memory.
"""
def __init__(self) -> None:
self._lock = threading.Lock()
self._entries: dict[str, _Entry] = {}
# (id(artifact), scope) -> handle, so re-storing the same instance under
# the same scope reuses its handle. Keying on the scope too means storing
# an object under a different scope gets its own handle and its own
# cleanup entry rather than overwriting the first.
self._handle_by_obj: dict[tuple[int, str | None], str] = {}
def store(
self,
artifact: FileArtifact,
scope_id: str | None = None,
ttl: int = DEFAULT_ARTIFACT_TTL,
) -> str:
norm_scope = str(scope_id) if scope_id is not None else None
obj_key = (id(artifact), norm_scope)
expires_at = (time.monotonic() + ttl) if ttl > 0 else None
with self._lock:
self._prune_locked()
existing = self._handle_by_obj.get(obj_key)
if existing is not None:
entry = self._entries.get(existing)
if entry is not None and entry.artifact is artifact:
entry.expires_at = expires_at
return f"{_HANDLE_SCHEME}://{existing}"
handle_id = str(uuid4())
self._entries[handle_id] = _Entry(
artifact=artifact,
scope_id=norm_scope,
expires_at=expires_at,
obj_id=id(artifact),
)
self._handle_by_obj[obj_key] = handle_id
return f"{_HANDLE_SCHEME}://{handle_id}"
def resolve(self, handle_id: str) -> FileArtifact | None:
with self._lock:
entry = self._entries.get(handle_id)
if entry is None:
return None
if entry.expires_at is not None and entry.expires_at <= time.monotonic():
self._delete_locked(handle_id)
return None
return entry.artifact
def clear_scope(self, scope_id: str) -> None:
scope = str(scope_id)
with self._lock:
for handle_id in [
hid for hid, entry in self._entries.items() if entry.scope_id == scope
]:
self._delete_locked(handle_id)
def _prune_locked(self) -> None:
"""Drop entries whose per-entry TTL has elapsed. Caller holds the lock."""
now = time.monotonic()
for handle_id in [
hid
for hid, entry in self._entries.items()
if entry.expires_at is not None and entry.expires_at <= now
]:
self._delete_locked(handle_id)
def _delete_locked(self, handle_id: str) -> None:
"""Remove an entry and its object-identity mapping. Caller holds lock."""
entry = self._entries.pop(handle_id, None)
if entry is not None:
self._handle_by_obj.pop((entry.obj_id, entry.scope_id), None)
_store: Final[_ArtifactStore] = _ArtifactStore()
def store_artifact(
artifact: FileArtifact,
scope_id: Any | None = None,
ttl: int = DEFAULT_ARTIFACT_TTL,
) -> str:
"""Store a :class:`FileArtifact` and return its model-facing placeholder text.
Args:
artifact: The binary artifact to keep out of the model context.
scope_id: Execution id (crew or task) used to group the artifact for
cleanup. ``None`` means it is only reclaimed by the TTL prune.
ttl: Seconds after which an unreferenced artifact may be pruned.
Returns:
The placeholder string to surface to the model in place of the bytes.
"""
handle = _store.store(artifact, scope_id=scope_id, ttl=ttl)
return artifact._placeholder(handle)
def resolve_artifact_handles(value: Any) -> Any:
"""Recursively replace stored handles in tool arguments with base64 data.
Walks strings, dicts, and lists. Any ``crewai+file://<uuid>`` token that
resolves to a stored artifact is replaced with that artifact's base64 string;
unknown tokens and all other values are returned unchanged. A new container is
returned so the caller's original arguments (used for events, caching, and
logs) keep the short handle.
"""
if isinstance(value, str):
if _HANDLE_SCHEME not in value:
return value
def _sub(match: re.Match[str]) -> str:
# Store keys are lowercase uuid4 strings; the regex matches hex
# case-insensitively, so normalize before lookup in case the model
# echoed the handle with uppercase hex.
artifact = _store.resolve(match.group(1).lower())
return artifact.as_base64() if artifact is not None else match.group(0)
return _HANDLE_RE.sub(_sub, value)
if isinstance(value, dict):
return {key: resolve_artifact_handles(val) for key, val in value.items()}
if isinstance(value, list):
return [resolve_artifact_handles(item) for item in value]
return value
def store_if_artifact(result: Any, scope_id: Any | None = None) -> Any:
"""Store ``result`` and return its placeholder if it is a :class:`FileArtifact`.
Any other value is returned unchanged. This is the single funnel both the
native and ReAct executor paths route tool output through, so fresh and
cached results are handled identically.
"""
if isinstance(result, FileArtifact):
return store_artifact(result, scope_id=scope_id)
return result
def clear_artifact_scope(scope_id: Any) -> None:
"""Drop every artifact stored under ``scope_id`` (called when a run ends)."""
_store.clear_scope(scope_id)
def artifact_scope_id(
crew: Any | None = None,
task: Any | None = None,
agent: Any | None = None,
) -> Any | None:
"""Pick the execution id used to scope a tool's file artifacts for cleanup.
Prefer the crew id -- it matches the id ``Crew`` passes to
:func:`clear_artifact_scope` when a run ends -- falling back to the agent's
crew, then the task id, then ``None`` (TTL-only cleanup). Centralized, and
given the agent fallback, so every tool-execution path derives the scope the
same way and can't drift.
"""
if crew is None:
crew = getattr(agent, "crew", None)
crew_id = getattr(crew, "id", None)
if crew_id is not None:
return crew_id
return getattr(task, "id", None)
def _sanitize_attr(text: str) -> str:
"""Strip characters that would break the bracketed placeholder display."""
return (
text.replace('"', "'").replace("]", ")").replace("\n", " ").replace("\r", " ")
)
def _human_size(size_bytes: int) -> str:
size = float(size_bytes)
for unit in ("B", "KB", "MB", "GB", "TB", "PB"):
if size < 1024 or unit == "PB":
return f"{int(size)} {unit}" if unit == "B" else f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} PB"

View File

@@ -22,6 +22,7 @@ from crewai.events.types.tool_usage_events import (
ToolValidateInputErrorEvent,
)
from crewai.telemetry.telemetry import Telemetry
from crewai.tools.file_artifact import artifact_scope_id, resolve_artifact_handles
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
from crewai.utilities.agent_utils import (
@@ -327,12 +328,14 @@ class ToolUsage:
if k in acceptable_args
}
result = await tool.ainvoke(
input=arguments, config=fingerprint_config
input=resolve_artifact_handles(arguments),
config=fingerprint_config,
)
except Exception:
arguments = calling.arguments
result = await tool.ainvoke(
input=arguments, config=fingerprint_config
input=resolve_artifact_handles(arguments),
config=fingerprint_config,
)
else:
result = await tool.ainvoke(input={}, config=fingerprint_config)
@@ -558,12 +561,14 @@ class ToolUsage:
if k in acceptable_args
}
result = tool.invoke(
input=arguments, config=fingerprint_config
input=resolve_artifact_handles(arguments),
config=fingerprint_config,
)
except Exception:
arguments = calling.arguments
result = tool.invoke(
input=arguments, config=fingerprint_config
input=resolve_artifact_handles(arguments),
config=fingerprint_config,
)
else:
result = tool.invoke(input={}, config=fingerprint_config)
@@ -679,9 +684,17 @@ class ToolUsage:
return result
@property
def _artifact_scope_id(self) -> Any | None:
"""Execution id used to scope out-of-band file artifacts for cleanup."""
return artifact_scope_id(task=self.task, agent=self.agent)
def _format_result(self, result: Any) -> str:
from crewai.tools.file_artifact import store_if_artifact
if self.task:
self.task.used_tools += 1
result = store_if_artifact(result, self._artifact_scope_id)
if self._should_remember_format():
result = self._remember_format(result=result)
return str(result)

View File

@@ -27,6 +27,11 @@ from crewai.agents.parser import (
from crewai.llms.base_llm import BaseLLM, call_stop_override
from crewai.tools import BaseTool as CrewAITool
from crewai.tools.base_tool import BaseTool
from crewai.tools.file_artifact import (
artifact_scope_id,
resolve_artifact_handles,
store_if_artifact,
)
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.utilities.errors import AgentRepositoryError
@@ -1416,6 +1421,7 @@ def execute_single_native_tool_call(
args_dict = func_args
agent_key = getattr(agent, "key", "unknown") if agent else "unknown"
scope_id = artifact_scope_id(crew, task, agent)
original_tool: BaseTool | None = None
for tool in original_tools:
@@ -1430,6 +1436,7 @@ def execute_single_native_tool_call(
if tools_handler and tools_handler.cache:
cached_result = tools_handler.cache.read(tool=func_name, input=input_str)
if cached_result is not None:
cached_result = store_if_artifact(cached_result, scope_id)
result = (
str(cached_result)
if not isinstance(cached_result, str)
@@ -1481,7 +1488,8 @@ def execute_single_native_tool_call(
if func_name in available_functions:
try:
tool_func = available_functions[func_name]
raw_result = tool_func(**args_dict)
invoke_args = resolve_artifact_handles(args_dict) if args_dict else {}
raw_result = tool_func(**invoke_args)
if tools_handler and tools_handler.cache:
should_cache = True
@@ -1494,6 +1502,7 @@ def execute_single_native_tool_call(
tool=func_name, input=input_str, output=raw_result
)
raw_result = store_if_artifact(raw_result, scope_id)
result = (
str(raw_result) if not isinstance(raw_result, str) else raw_result
)
@@ -1532,6 +1541,10 @@ def execute_single_native_tool_call(
except Exception: # noqa: S110
pass
# An after_tool_call hook may have replaced the result with a FileArtifact;
# keep those bytes out of the message and events too.
result = store_if_artifact(result, scope_id)
if not error_event_emitted:
crewai_event_bus.emit(
event_source,

View File

@@ -0,0 +1,397 @@
"""Tests for out-of-band binary file passing between tools."""
from __future__ import annotations
import base64
import re
import time
import pytest
from crewai.tools import FileArtifact
from crewai.tools.file_artifact import (
_store,
artifact_scope_id,
clear_artifact_scope,
resolve_artifact_handles,
store_artifact,
store_if_artifact,
)
_HANDLE = re.compile(r"crewai\+file://[0-9a-fA-F-]{36}")
@pytest.fixture(autouse=True)
def _clear_store():
"""Keep the process-local store empty between tests."""
_store._entries.clear()
_store._handle_by_obj.clear()
yield
_store._entries.clear()
_store._handle_by_obj.clear()
def _handle_in(text: str) -> str:
match = _HANDLE.search(text)
assert match is not None, f"no handle in: {text!r}"
return match.group(0)
class TestFileArtifact:
def test_as_base64_round_trips(self) -> None:
data = bytes(range(256))
artifact = FileArtifact(data=data, filename="x.bin")
assert base64.b64decode(artifact.as_base64()) == data
def test_size_bytes(self) -> None:
assert FileArtifact(data=b"abc").size_bytes == 3
def test_defaults(self) -> None:
artifact = FileArtifact(data=b"")
assert artifact.filename == "file"
assert artifact.mime_type == "application/octet-stream"
class TestStoreArtifact:
def test_placeholder_contains_metadata_and_handle(self) -> None:
artifact = FileArtifact(
data=b"\x00" * 30045, filename="deck.pptx", mime_type="application/pptx"
)
placeholder = store_artifact(artifact, scope_id="crew-1")
assert 'filename="deck.pptx"' in placeholder
assert 'mime_type="application/pptx"' in placeholder
assert "29.3 KB" in placeholder
assert _HANDLE.search(placeholder) is not None
def test_each_store_gets_a_unique_handle(self) -> None:
h1 = _handle_in(store_artifact(FileArtifact(data=b"a")))
h2 = _handle_in(store_artifact(FileArtifact(data=b"a")))
assert h1 != h2
def test_restoring_same_instance_reuses_handle(self) -> None:
# The tool-result cache hands back the same FileArtifact on every cache
# hit; re-storing it must reuse the handle, not stack duplicate copies.
artifact = FileArtifact(data=b"payload" * 1000)
h1 = _handle_in(store_artifact(artifact, scope_id="s"))
h2 = _handle_in(store_artifact(artifact, scope_id="s"))
assert h1 == h2
assert len(_store._entries) == 1
def test_same_instance_different_scope_gets_own_handle_and_cleans_up(self) -> None:
# Storing one instance under two scopes must not orphan a mapping:
# each scope keeps its own handle, and clearing one leaves the other.
artifact = FileArtifact(data=b"x" * 100)
h_a = _handle_in(store_artifact(artifact, scope_id="A"))
h_b = _handle_in(store_artifact(artifact, scope_id="B"))
assert h_a != h_b
clear_artifact_scope("A")
assert resolve_artifact_handles(h_a) == h_a # A cleared
assert base64.b64decode(resolve_artifact_handles(h_b)) == b"x" * 100
# No dangling object-identity mapping for the cleared scope.
assert (id(artifact), "A") not in _store._handle_by_obj
clear_artifact_scope("B")
assert _store._handle_by_obj == {}
def test_placeholder_escapes_quotes_in_metadata(self) -> None:
artifact = FileArtifact(data=b"x", filename='a".pptx', mime_type='m"/x')
placeholder = store_artifact(artifact)
# The bracketed attribute list must not be broken by an embedded quote,
# and the handle must still be recoverable.
assert 'filename="a\'.pptx"' in placeholder
assert _HANDLE.search(placeholder) is not None
def test_placeholder_neutralizes_bracket_and_newlines(self) -> None:
artifact = FileArtifact(data=b"x", filename="a]b\nc.bin")
placeholder = store_artifact(artifact)
first_line = placeholder.splitlines()[0]
# The closing bracket and newline can't appear inside the attributes,
# so the bracketed segment stays a single, well-formed line.
assert first_line.count("]") == 1 and first_line.endswith("]")
assert _HANDLE.search(placeholder) is not None
class TestArtifactScopeId:
class _Obj:
def __init__(self, id_):
self.id = id_
def test_prefers_crew_id(self) -> None:
assert artifact_scope_id(self._Obj("crew"), self._Obj("task")) == "crew"
def test_falls_back_to_task_when_no_crew(self) -> None:
assert artifact_scope_id(None, self._Obj("task")) == "task"
def test_falls_back_to_task_when_crew_id_is_none(self) -> None:
assert artifact_scope_id(self._Obj(None), self._Obj("task")) == "task"
def test_none_when_neither_present(self) -> None:
assert artifact_scope_id(None, None) is None
def test_falls_back_to_agent_crew(self) -> None:
# Native executors may have crew=None while the agent carries the crew;
# the helper must still resolve the crew id so cleanup scopes align.
agent = self._Obj(None)
agent.crew = self._Obj("crew-from-agent")
assert artifact_scope_id(None, self._Obj("task"), agent) == "crew-from-agent"
def test_explicit_crew_beats_agent_crew(self) -> None:
agent = self._Obj(None)
agent.crew = self._Obj("agent-crew")
assert artifact_scope_id(self._Obj("direct-crew"), None, agent) == "direct-crew"
class TestResolveArtifactHandles:
def test_exact_handle_resolves_to_base64(self) -> None:
data = bytes(range(256)) * 100
handle = _handle_in(store_artifact(FileArtifact(data=data)))
resolved = resolve_artifact_handles(handle)
assert base64.b64decode(resolved) == data
def test_resolves_handle_with_uppercased_hex(self) -> None:
# A model may echo the handle with uppercase uuid hex; lookup must still
# hit the lowercase-keyed store.
data = b"upper-case-payload" * 100
handle = _handle_in(store_artifact(FileArtifact(data=data)))
scheme, _, hex_part = handle.rpartition("/")
upper = f"{scheme}/{hex_part.upper()}"
assert upper != handle
assert base64.b64decode(resolve_artifact_handles(upper)) == data
def test_resolves_handle_inside_dict(self) -> None:
data = b"binary-payload" * 1000
handle = _handle_in(store_artifact(FileArtifact(data=data)))
args = {"file_name": "a.bin", "content": handle}
resolved = resolve_artifact_handles(args)
assert base64.b64decode(resolved["content"]) == data
assert resolved["file_name"] == "a.bin"
def test_resolves_handle_nested_in_list_and_dict(self) -> None:
handle = _handle_in(store_artifact(FileArtifact(data=b"xyz")))
resolved = resolve_artifact_handles({"items": [{"c": handle}]})
assert base64.b64decode(resolved["items"][0]["c"]) == b"xyz"
def test_does_not_mutate_original_arguments(self) -> None:
handle = _handle_in(store_artifact(FileArtifact(data=b"data")))
args = {"content": handle}
resolve_artifact_handles(args)
assert args["content"] == handle
def test_unknown_handle_is_left_unchanged(self) -> None:
token = "crewai+file://00000000-0000-0000-0000-000000000000"
assert resolve_artifact_handles(token) == token
def test_non_handle_strings_pass_through(self) -> None:
assert resolve_artifact_handles("just text") == "just text"
assert resolve_artifact_handles({"k": "v"}) == {"k": "v"}
def test_non_string_values_pass_through(self) -> None:
assert resolve_artifact_handles(42) == 42
assert resolve_artifact_handles(None) is None
assert resolve_artifact_handles([1, 2]) == [1, 2]
class TestStoreIfArtifact:
def test_artifact_becomes_placeholder(self) -> None:
result = store_if_artifact(FileArtifact(data=b"a" * 100), scope_id="s")
assert isinstance(result, str)
assert _HANDLE.search(result) is not None
def test_other_values_unchanged(self) -> None:
assert store_if_artifact("hello") == "hello"
assert store_if_artifact(7) == 7
class TestScoping:
def test_clear_scope_only_drops_its_own_artifacts(self) -> None:
h_a = _handle_in(store_artifact(FileArtifact(data=b"a"), scope_id="A"))
h_b = _handle_in(store_artifact(FileArtifact(data=b"b"), scope_id="B"))
clear_artifact_scope("A")
# A's handle no longer resolves; B's still does.
assert resolve_artifact_handles(h_a) == h_a
assert base64.b64decode(resolve_artifact_handles(h_b)) == b"b"
def test_unscoped_artifact_survives_other_scope_clears(self) -> None:
handle = _handle_in(store_artifact(FileArtifact(data=b"x")))
clear_artifact_scope("some-crew")
assert base64.b64decode(resolve_artifact_handles(handle)) == b"x"
def _legacy_executor_runner(tools):
"""Return a `(func_name, args) -> result_dict` driver for the legacy executor."""
from unittest.mock import Mock
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.tools.base_tool import to_langchain
from crewai.utilities.agent_utils import convert_tools_to_openai_schema
executor = CrewAgentExecutor(tools=to_langchain(tools), original_tools=tools)
agent = Mock(key="agent", role="tester", verbose=False, fingerprint=None)
agent.tools_results = []
executor.agent = agent
task = Mock(description="t", id="scope-legacy")
task.name = "t" # `name=` is a reserved Mock ctor kwarg, so assign explicitly
executor.task = task
_, available_functions, _ = convert_tools_to_openai_schema(tools)
def run(func_name, args):
return executor._execute_single_native_tool_call(
call_id="c",
func_name=func_name,
func_args=args,
available_functions=available_functions,
)
return run
def _experimental_executor_runner(tools):
"""Return a `(func_name, args) -> result_dict` driver for the default executor."""
import json
from types import SimpleNamespace
from unittest.mock import Mock
from crewai.experimental.agent_executor import AgentExecutor
executor = AgentExecutor.model_construct()
for key, value in {
"original_tools": tools,
"tools": [],
"tools_handler": None,
"crew": None,
}.items():
object.__setattr__(executor, key, value)
agent = Mock(key="agent", role="tester", verbose=False, fingerprint=None)
agent.tools_results = []
object.__setattr__(executor, "agent", agent)
task = Mock(id="scope-exp", description="t")
task.name = "t" # `name=` is a reserved Mock ctor kwarg, so assign explicitly
object.__setattr__(executor, "task", task)
executor._setup_native_tools()
def run(func_name, args):
tool_call = SimpleNamespace(
id="c",
function=SimpleNamespace(
name=func_name, arguments=args if isinstance(args, str) else json.dumps(args)
),
)
return executor._execute_single_native_tool_call(tool_call)
return run
@pytest.mark.parametrize(
"make_runner",
[_experimental_executor_runner, _legacy_executor_runner],
ids=["experimental", "legacy"],
)
class TestNativeExecutorWiring:
"""Guard producer/consumer wiring on both the default and legacy executors."""
def test_artifact_output_is_replaced_by_handle_and_resolves_downstream(
self, make_runner
) -> None:
from crewai.tools import BaseTool, FileArtifact
payload = bytes(range(256)) * 200 # ~51 KB, far past the LLM round-trip limit
class Generate(BaseTool):
name: str = "generate_file"
description: str = "Generate a binary file"
def _run(self) -> FileArtifact:
return FileArtifact(
data=payload, filename="deck.pptx", mime_type="application/pptx"
)
captured: dict[str, str] = {}
class Upload(BaseTool):
name: str = "upload_file"
description: str = "Upload base64 content"
def _run(self, content: str) -> str:
captured["content"] = content
return "uploaded"
run = make_runner([Generate(), Upload()])
# Producer: the 51 KB payload must NOT appear in the model-facing result.
gen_result = run("generate_file", "{}")["result"]
assert "deck.pptx" in gen_result
assert base64.b64encode(payload).decode() not in gen_result
handle = _handle_in(gen_result)
# Consumer: the handle the model echoes is expanded to exact bytes.
up_result = run("upload_file", {"content": handle})["result"]
assert up_result == "uploaded"
assert base64.b64decode(captured["content"]) == payload
class TestAfterHookArtifact:
"""An after_tool_call hook that returns a FileArtifact must still be stored."""
def test_hook_returned_artifact_is_replaced_by_handle(self) -> None:
from crewai.hooks.tool_hooks import (
register_after_tool_call_hook,
unregister_after_tool_call_hook,
)
from crewai.tools import BaseTool, FileArtifact
payload = bytes(range(256)) * 50
class Echo(BaseTool):
name: str = "echo"
description: str = "Echo"
def _run(self) -> str:
return "plain text"
def hook(_context):
return FileArtifact(data=payload, filename="hook.bin")
register_after_tool_call_hook(hook)
try:
run = _experimental_executor_runner([Echo()])
result = run("echo", "{}")["result"]
finally:
unregister_after_tool_call_hook(hook)
assert base64.b64encode(payload).decode() not in result
assert _HANDLE.search(result) is not None
class TestTtlPrune:
@staticmethod
def _expire(handle: str) -> None:
"""Force a stored handle's per-entry TTL into the past."""
entry = _store._entries[handle.rsplit("/", 1)[-1]]
entry.expires_at = time.monotonic() - 1
def test_expired_handle_does_not_resolve(self) -> None:
handle = _handle_in(store_artifact(FileArtifact(data=b"old"), ttl=3600))
self._expire(handle)
# An expired handle is enforced on lookup, not just on the next write.
assert resolve_artifact_handles(handle) == handle
def test_short_ttl_store_does_not_evict_long_ttl_entries(self) -> None:
keep = _handle_in(store_artifact(FileArtifact(data=b"keep"), ttl=3600))
# A later short-TTL store must prune only by each entry's own expiry,
# never by the current call's ttl.
store_artifact(FileArtifact(data=b"tiny"), ttl=1)
assert base64.b64decode(resolve_artifact_handles(keep)) == b"keep"
def test_expired_entries_are_pruned_on_next_store(self) -> None:
stale = _handle_in(store_artifact(FileArtifact(data=b"old"), ttl=3600))
self._expire(stale)
store_artifact(FileArtifact(data=b"new"), ttl=3600)
assert stale.rsplit("/", 1)[-1] not in _store._entries
def test_ttl_zero_never_expires(self) -> None:
handle = _handle_in(store_artifact(FileArtifact(data=b"keep"), ttl=0))
assert _store._entries[handle.rsplit("/", 1)[-1]].expires_at is None
store_artifact(FileArtifact(data=b"another"), ttl=0)
assert base64.b64decode(resolve_artifact_handles(handle)) == b"keep"