mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-06-15 13:18:09 +00:00
Compare commits
9 Commits
flow-itera
...
matcha/age
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0fba8163ca | ||
|
|
ad798afeca | ||
|
|
8d2ca5ef4c | ||
|
|
000dd41fc3 | ||
|
|
b827f7ee11 | ||
|
|
db12082ad8 | ||
|
|
06b239d8fe | ||
|
|
f7667c1f12 | ||
|
|
f300c1f2a6 |
@@ -863,6 +863,13 @@ class CrewAgentExecutor(BaseAgentExecutor):
|
||||
ToolUsageFinishedEvent,
|
||||
ToolUsageStartedEvent,
|
||||
)
|
||||
from crewai.tools.file_artifact import (
|
||||
artifact_scope_id,
|
||||
resolve_artifact_handles,
|
||||
store_if_artifact,
|
||||
)
|
||||
|
||||
scope_id = artifact_scope_id(self.crew, self.task, self.agent)
|
||||
|
||||
args_dict, parse_error = parse_tool_call_args(
|
||||
func_args, func_name, call_id, original_tool
|
||||
@@ -896,6 +903,7 @@ class CrewAgentExecutor(BaseAgentExecutor):
|
||||
tool=func_name, input=input_str
|
||||
)
|
||||
if cached_result is not None:
|
||||
cached_result = store_if_artifact(cached_result, scope_id)
|
||||
result = (
|
||||
str(cached_result)
|
||||
if not isinstance(cached_result, str)
|
||||
@@ -960,7 +968,8 @@ class CrewAgentExecutor(BaseAgentExecutor):
|
||||
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
|
||||
elif not from_cache and func_name in available_functions:
|
||||
try:
|
||||
raw_result = available_functions[func_name](**(args_dict or {}))
|
||||
invoke_args = resolve_artifact_handles(args_dict) if args_dict else {}
|
||||
raw_result = available_functions[func_name](**invoke_args)
|
||||
|
||||
if self.tools_handler and self.tools_handler.cache:
|
||||
should_cache = True
|
||||
@@ -977,6 +986,7 @@ class CrewAgentExecutor(BaseAgentExecutor):
|
||||
tool=func_name, input=input_str, output=raw_result
|
||||
)
|
||||
|
||||
raw_result = store_if_artifact(raw_result, scope_id)
|
||||
result = (
|
||||
str(raw_result) if not isinstance(raw_result, str) else raw_result
|
||||
)
|
||||
@@ -1020,6 +1030,10 @@ class CrewAgentExecutor(BaseAgentExecutor):
|
||||
color="red",
|
||||
)
|
||||
|
||||
# An after_tool_call hook may have replaced the result with a
|
||||
# FileArtifact; keep those bytes out of the message and events too.
|
||||
result = store_if_artifact(result, scope_id)
|
||||
|
||||
if not error_event_emitted:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
|
||||
@@ -116,6 +116,7 @@ from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.tools.agent_tools.agent_tools import AgentTools
|
||||
from crewai.tools.agent_tools.read_file_tool import ReadFileTool
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.tools.file_artifact import clear_artifact_scope
|
||||
from crewai.types.callback import SerializableCallable
|
||||
from crewai.types.streaming import CrewStreamingOutput
|
||||
from crewai.types.usage_metrics import UsageMetrics
|
||||
@@ -1047,6 +1048,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
if self._memory is not None and hasattr(self._memory, "drain_writes"):
|
||||
self._memory.drain_writes()
|
||||
clear_files(self.id)
|
||||
clear_artifact_scope(self.id)
|
||||
detach(token)
|
||||
|
||||
def _post_kickoff(self, result: CrewOutput) -> CrewOutput:
|
||||
@@ -1255,6 +1257,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
raise
|
||||
finally:
|
||||
clear_files(self.id)
|
||||
clear_artifact_scope(self.id)
|
||||
detach(token)
|
||||
|
||||
async def akickoff_for_each(
|
||||
|
||||
@@ -70,6 +70,11 @@ from crewai.hooks.types import (
|
||||
BeforeLLMCallHookType,
|
||||
)
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.tools.file_artifact import (
|
||||
artifact_scope_id,
|
||||
resolve_artifact_handles,
|
||||
store_if_artifact,
|
||||
)
|
||||
from crewai.tools.structured_tool import CrewStructuredTool
|
||||
from crewai.utilities.agent_utils import (
|
||||
_llm_stop_words_applied,
|
||||
@@ -1762,6 +1767,8 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
return parse_error
|
||||
args_dict: dict[str, Any] = parsed_args or {}
|
||||
|
||||
scope_id = artifact_scope_id(self.crew, self.task, self.agent)
|
||||
|
||||
# Get agent_key for event tracking
|
||||
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
|
||||
|
||||
@@ -1794,6 +1801,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
tool=func_name, input=input_str
|
||||
)
|
||||
if cached_result is not None:
|
||||
cached_result = store_if_artifact(cached_result, scope_id)
|
||||
result = (
|
||||
str(cached_result)
|
||||
if not isinstance(cached_result, str)
|
||||
@@ -1859,7 +1867,10 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
if func_name in self._available_functions:
|
||||
try:
|
||||
tool_func = self._available_functions[func_name]
|
||||
raw_result = tool_func(**args_dict)
|
||||
invoke_args = (
|
||||
resolve_artifact_handles(args_dict) if args_dict else {}
|
||||
)
|
||||
raw_result = tool_func(**invoke_args)
|
||||
|
||||
# Add to cache after successful execution (before string conversion)
|
||||
if self.tools_handler and self.tools_handler.cache:
|
||||
@@ -1874,6 +1885,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
)
|
||||
|
||||
# Convert to string for message
|
||||
raw_result = store_if_artifact(raw_result, scope_id)
|
||||
result = (
|
||||
str(raw_result)
|
||||
if not isinstance(raw_result, str)
|
||||
@@ -1927,6 +1939,10 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
color="red",
|
||||
)
|
||||
|
||||
# An after_tool_call hook may have replaced the result with a
|
||||
# FileArtifact; keep those bytes out of the message and events too.
|
||||
result = store_if_artifact(result, scope_id)
|
||||
|
||||
if not error_event_emitted:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
from crewai.tools.base_tool import BaseTool, EnvVar, tool
|
||||
from crewai.tools.file_artifact import FileArtifact
|
||||
|
||||
|
||||
__all__ = [
|
||||
"BaseTool",
|
||||
"EnvVar",
|
||||
"FileArtifact",
|
||||
"tool",
|
||||
]
|
||||
|
||||
296
lib/crewai/src/crewai/tools/file_artifact.py
Normal file
296
lib/crewai/src/crewai/tools/file_artifact.py
Normal file
@@ -0,0 +1,296 @@
|
||||
"""Out-of-band binary file passing between tools.
|
||||
|
||||
LLMs cannot reproduce opaque strings longer than a few kilobytes byte-perfect.
|
||||
A base64-encoded binary file (PPTX, PDF, image, ...) returned by one tool and
|
||||
echoed by the model as the argument to another tool drifts by a few characters,
|
||||
which invalidates the base64 and corrupts the resulting file.
|
||||
|
||||
To avoid routing bytes through the model, a tool returns a :class:`FileArtifact`
|
||||
instead of a base64 string. The agent executor stores the bytes here and shows
|
||||
the model a short, stable ``crewai+file://<uuid>`` handle in place of the data.
|
||||
When the model passes that handle as an argument to a later tool, the executor
|
||||
expands it back to base64 *just before* the tool runs -- the bytes never enter
|
||||
the model's context, so they cannot be corrupted.
|
||||
|
||||
The handle is namespaced (``crewai+file://``) so resolution only ever fires on
|
||||
tokens this module minted, never on arbitrary user data. Stored bytes are scoped
|
||||
to a crew/task execution id and cleared when that execution finishes; a TTL prune
|
||||
is the safety net for runs that never call :func:`clear_artifact_scope`.
|
||||
|
||||
Limitation: handles are ephemeral and scoped to a single run. A handle only
|
||||
resolves while its run's artifacts are live. If a placeholder's text is persisted
|
||||
(conversation memory, a checkpoint) and a *later* run echoes that handle, it will
|
||||
no longer resolve and the literal token is passed through unchanged -- so binary
|
||||
producer->consumer chains must complete within one run.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
from dataclasses import dataclass
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, Final
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
__all__ = [
|
||||
"FileArtifact",
|
||||
"artifact_scope_id",
|
||||
"clear_artifact_scope",
|
||||
"resolve_artifact_handles",
|
||||
"store_artifact",
|
||||
"store_if_artifact",
|
||||
]
|
||||
|
||||
_HANDLE_SCHEME: Final[str] = "crewai+file"
|
||||
# A minted handle: crewai+file://<uuid4>. Matched case-insensitively because
|
||||
# uuid hex may arrive upper- or lower-cased after a model round-trip.
|
||||
_HANDLE_RE: Final[re.Pattern[str]] = re.compile(
|
||||
r"crewai\+file://([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-"
|
||||
r"[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
|
||||
)
|
||||
|
||||
DEFAULT_ARTIFACT_TTL: Final[int] = 3600
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileArtifact:
|
||||
"""Binary file produced or consumed by a tool, kept out of the LLM context.
|
||||
|
||||
Return this from a tool's ``_run`` instead of a base64 string when the output
|
||||
is binary. The executor stores the bytes and substitutes a short handle in the
|
||||
text the model sees, so the model never has to reproduce the data verbatim.
|
||||
|
||||
Attributes:
|
||||
data: Raw file bytes.
|
||||
filename: Human-readable name, surfaced to the model and useful as a
|
||||
default for downstream ``file_name`` arguments.
|
||||
mime_type: MIME type of the content.
|
||||
"""
|
||||
|
||||
data: bytes
|
||||
filename: str = "file"
|
||||
mime_type: str = "application/octet-stream"
|
||||
|
||||
@property
|
||||
def size_bytes(self) -> int:
|
||||
return len(self.data)
|
||||
|
||||
def as_base64(self) -> str:
|
||||
"""Return the bytes as an ASCII base64 string (what connectors expect)."""
|
||||
return base64.b64encode(self.data).decode("ascii")
|
||||
|
||||
def _placeholder(self, handle: str) -> str:
|
||||
"""Build the model-facing text that stands in for the bytes."""
|
||||
# Neutralize characters that would break the single-line bracketed
|
||||
# attribute list (quotes, the closing bracket, newlines).
|
||||
filename = _sanitize_attr(self.filename)
|
||||
mime_type = _sanitize_attr(self.mime_type)
|
||||
return (
|
||||
f'[FileArtifact filename="{filename}" '
|
||||
f'mime_type="{mime_type}" size={_human_size(self.size_bytes)} '
|
||||
f"handle={handle}]\n"
|
||||
"The binary content is stored out-of-band to keep it from being "
|
||||
"corrupted in transit. To use this file, pass the handle string "
|
||||
f"({handle}) as the value of the content/file argument when calling "
|
||||
"another tool -- it is expanded to the real data before that tool runs."
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class _Entry:
|
||||
artifact: FileArtifact
|
||||
scope_id: str | None
|
||||
expires_at: float | None
|
||||
obj_id: int
|
||||
|
||||
|
||||
class _ArtifactStore:
|
||||
"""Process-local, execution-scoped store keyed by minted handle id.
|
||||
|
||||
Entries are keyed by an opaque uuid (never by user-supplied content), so
|
||||
concurrent crews cannot collide. Cleanup is per-scope -- clearing one crew's
|
||||
artifacts never touches another's -- with a TTL prune as a backstop.
|
||||
|
||||
Storing the same :class:`FileArtifact` instance again under the same scope
|
||||
reuses its handle rather than minting a duplicate. The tool-result cache
|
||||
hands back the same object on every cache hit, so this keeps repeated cached
|
||||
calls from stacking identical byte copies in memory.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._lock = threading.Lock()
|
||||
self._entries: dict[str, _Entry] = {}
|
||||
# (id(artifact), scope) -> handle, so re-storing the same instance under
|
||||
# the same scope reuses its handle. Keying on the scope too means storing
|
||||
# an object under a different scope gets its own handle and its own
|
||||
# cleanup entry rather than overwriting the first.
|
||||
self._handle_by_obj: dict[tuple[int, str | None], str] = {}
|
||||
|
||||
def store(
|
||||
self,
|
||||
artifact: FileArtifact,
|
||||
scope_id: str | None = None,
|
||||
ttl: int = DEFAULT_ARTIFACT_TTL,
|
||||
) -> str:
|
||||
norm_scope = str(scope_id) if scope_id is not None else None
|
||||
obj_key = (id(artifact), norm_scope)
|
||||
expires_at = (time.monotonic() + ttl) if ttl > 0 else None
|
||||
with self._lock:
|
||||
self._prune_locked()
|
||||
existing = self._handle_by_obj.get(obj_key)
|
||||
if existing is not None:
|
||||
entry = self._entries.get(existing)
|
||||
if entry is not None and entry.artifact is artifact:
|
||||
entry.expires_at = expires_at
|
||||
return f"{_HANDLE_SCHEME}://{existing}"
|
||||
handle_id = str(uuid4())
|
||||
self._entries[handle_id] = _Entry(
|
||||
artifact=artifact,
|
||||
scope_id=norm_scope,
|
||||
expires_at=expires_at,
|
||||
obj_id=id(artifact),
|
||||
)
|
||||
self._handle_by_obj[obj_key] = handle_id
|
||||
return f"{_HANDLE_SCHEME}://{handle_id}"
|
||||
|
||||
def resolve(self, handle_id: str) -> FileArtifact | None:
|
||||
with self._lock:
|
||||
entry = self._entries.get(handle_id)
|
||||
if entry is None:
|
||||
return None
|
||||
if entry.expires_at is not None and entry.expires_at <= time.monotonic():
|
||||
self._delete_locked(handle_id)
|
||||
return None
|
||||
return entry.artifact
|
||||
|
||||
def clear_scope(self, scope_id: str) -> None:
|
||||
scope = str(scope_id)
|
||||
with self._lock:
|
||||
for handle_id in [
|
||||
hid for hid, entry in self._entries.items() if entry.scope_id == scope
|
||||
]:
|
||||
self._delete_locked(handle_id)
|
||||
|
||||
def _prune_locked(self) -> None:
|
||||
"""Drop entries whose per-entry TTL has elapsed. Caller holds the lock."""
|
||||
now = time.monotonic()
|
||||
for handle_id in [
|
||||
hid
|
||||
for hid, entry in self._entries.items()
|
||||
if entry.expires_at is not None and entry.expires_at <= now
|
||||
]:
|
||||
self._delete_locked(handle_id)
|
||||
|
||||
def _delete_locked(self, handle_id: str) -> None:
|
||||
"""Remove an entry and its object-identity mapping. Caller holds lock."""
|
||||
entry = self._entries.pop(handle_id, None)
|
||||
if entry is not None:
|
||||
self._handle_by_obj.pop((entry.obj_id, entry.scope_id), None)
|
||||
|
||||
|
||||
_store: Final[_ArtifactStore] = _ArtifactStore()
|
||||
|
||||
|
||||
def store_artifact(
|
||||
artifact: FileArtifact,
|
||||
scope_id: Any | None = None,
|
||||
ttl: int = DEFAULT_ARTIFACT_TTL,
|
||||
) -> str:
|
||||
"""Store a :class:`FileArtifact` and return its model-facing placeholder text.
|
||||
|
||||
Args:
|
||||
artifact: The binary artifact to keep out of the model context.
|
||||
scope_id: Execution id (crew or task) used to group the artifact for
|
||||
cleanup. ``None`` means it is only reclaimed by the TTL prune.
|
||||
ttl: Seconds after which an unreferenced artifact may be pruned.
|
||||
|
||||
Returns:
|
||||
The placeholder string to surface to the model in place of the bytes.
|
||||
"""
|
||||
handle = _store.store(artifact, scope_id=scope_id, ttl=ttl)
|
||||
return artifact._placeholder(handle)
|
||||
|
||||
|
||||
def resolve_artifact_handles(value: Any) -> Any:
|
||||
"""Recursively replace stored handles in tool arguments with base64 data.
|
||||
|
||||
Walks strings, dicts, and lists. Any ``crewai+file://<uuid>`` token that
|
||||
resolves to a stored artifact is replaced with that artifact's base64 string;
|
||||
unknown tokens and all other values are returned unchanged. A new container is
|
||||
returned so the caller's original arguments (used for events, caching, and
|
||||
logs) keep the short handle.
|
||||
"""
|
||||
if isinstance(value, str):
|
||||
if _HANDLE_SCHEME not in value:
|
||||
return value
|
||||
|
||||
def _sub(match: re.Match[str]) -> str:
|
||||
# Store keys are lowercase uuid4 strings; the regex matches hex
|
||||
# case-insensitively, so normalize before lookup in case the model
|
||||
# echoed the handle with uppercase hex.
|
||||
artifact = _store.resolve(match.group(1).lower())
|
||||
return artifact.as_base64() if artifact is not None else match.group(0)
|
||||
|
||||
return _HANDLE_RE.sub(_sub, value)
|
||||
if isinstance(value, dict):
|
||||
return {key: resolve_artifact_handles(val) for key, val in value.items()}
|
||||
if isinstance(value, list):
|
||||
return [resolve_artifact_handles(item) for item in value]
|
||||
return value
|
||||
|
||||
|
||||
def store_if_artifact(result: Any, scope_id: Any | None = None) -> Any:
|
||||
"""Store ``result`` and return its placeholder if it is a :class:`FileArtifact`.
|
||||
|
||||
Any other value is returned unchanged. This is the single funnel both the
|
||||
native and ReAct executor paths route tool output through, so fresh and
|
||||
cached results are handled identically.
|
||||
"""
|
||||
if isinstance(result, FileArtifact):
|
||||
return store_artifact(result, scope_id=scope_id)
|
||||
return result
|
||||
|
||||
|
||||
def clear_artifact_scope(scope_id: Any) -> None:
|
||||
"""Drop every artifact stored under ``scope_id`` (called when a run ends)."""
|
||||
_store.clear_scope(scope_id)
|
||||
|
||||
|
||||
def artifact_scope_id(
|
||||
crew: Any | None = None,
|
||||
task: Any | None = None,
|
||||
agent: Any | None = None,
|
||||
) -> Any | None:
|
||||
"""Pick the execution id used to scope a tool's file artifacts for cleanup.
|
||||
|
||||
Prefer the crew id -- it matches the id ``Crew`` passes to
|
||||
:func:`clear_artifact_scope` when a run ends -- falling back to the agent's
|
||||
crew, then the task id, then ``None`` (TTL-only cleanup). Centralized, and
|
||||
given the agent fallback, so every tool-execution path derives the scope the
|
||||
same way and can't drift.
|
||||
"""
|
||||
if crew is None:
|
||||
crew = getattr(agent, "crew", None)
|
||||
crew_id = getattr(crew, "id", None)
|
||||
if crew_id is not None:
|
||||
return crew_id
|
||||
return getattr(task, "id", None)
|
||||
|
||||
|
||||
def _sanitize_attr(text: str) -> str:
|
||||
"""Strip characters that would break the bracketed placeholder display."""
|
||||
return (
|
||||
text.replace('"', "'").replace("]", ")").replace("\n", " ").replace("\r", " ")
|
||||
)
|
||||
|
||||
|
||||
def _human_size(size_bytes: int) -> str:
|
||||
size = float(size_bytes)
|
||||
for unit in ("B", "KB", "MB", "GB", "TB", "PB"):
|
||||
if size < 1024 or unit == "PB":
|
||||
return f"{int(size)} {unit}" if unit == "B" else f"{size:.1f} {unit}"
|
||||
size /= 1024
|
||||
return f"{size:.1f} PB"
|
||||
@@ -22,6 +22,7 @@ from crewai.events.types.tool_usage_events import (
|
||||
ToolValidateInputErrorEvent,
|
||||
)
|
||||
from crewai.telemetry.telemetry import Telemetry
|
||||
from crewai.tools.file_artifact import artifact_scope_id, resolve_artifact_handles
|
||||
from crewai.tools.structured_tool import CrewStructuredTool
|
||||
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
|
||||
from crewai.utilities.agent_utils import (
|
||||
@@ -327,12 +328,14 @@ class ToolUsage:
|
||||
if k in acceptable_args
|
||||
}
|
||||
result = await tool.ainvoke(
|
||||
input=arguments, config=fingerprint_config
|
||||
input=resolve_artifact_handles(arguments),
|
||||
config=fingerprint_config,
|
||||
)
|
||||
except Exception:
|
||||
arguments = calling.arguments
|
||||
result = await tool.ainvoke(
|
||||
input=arguments, config=fingerprint_config
|
||||
input=resolve_artifact_handles(arguments),
|
||||
config=fingerprint_config,
|
||||
)
|
||||
else:
|
||||
result = await tool.ainvoke(input={}, config=fingerprint_config)
|
||||
@@ -558,12 +561,14 @@ class ToolUsage:
|
||||
if k in acceptable_args
|
||||
}
|
||||
result = tool.invoke(
|
||||
input=arguments, config=fingerprint_config
|
||||
input=resolve_artifact_handles(arguments),
|
||||
config=fingerprint_config,
|
||||
)
|
||||
except Exception:
|
||||
arguments = calling.arguments
|
||||
result = tool.invoke(
|
||||
input=arguments, config=fingerprint_config
|
||||
input=resolve_artifact_handles(arguments),
|
||||
config=fingerprint_config,
|
||||
)
|
||||
else:
|
||||
result = tool.invoke(input={}, config=fingerprint_config)
|
||||
@@ -679,9 +684,17 @@ class ToolUsage:
|
||||
|
||||
return result
|
||||
|
||||
@property
|
||||
def _artifact_scope_id(self) -> Any | None:
|
||||
"""Execution id used to scope out-of-band file artifacts for cleanup."""
|
||||
return artifact_scope_id(task=self.task, agent=self.agent)
|
||||
|
||||
def _format_result(self, result: Any) -> str:
|
||||
from crewai.tools.file_artifact import store_if_artifact
|
||||
|
||||
if self.task:
|
||||
self.task.used_tools += 1
|
||||
result = store_if_artifact(result, self._artifact_scope_id)
|
||||
if self._should_remember_format():
|
||||
result = self._remember_format(result=result)
|
||||
return str(result)
|
||||
|
||||
@@ -27,6 +27,11 @@ from crewai.agents.parser import (
|
||||
from crewai.llms.base_llm import BaseLLM, call_stop_override
|
||||
from crewai.tools import BaseTool as CrewAITool
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.tools.file_artifact import (
|
||||
artifact_scope_id,
|
||||
resolve_artifact_handles,
|
||||
store_if_artifact,
|
||||
)
|
||||
from crewai.tools.structured_tool import CrewStructuredTool
|
||||
from crewai.tools.tool_types import ToolResult
|
||||
from crewai.utilities.errors import AgentRepositoryError
|
||||
@@ -1416,6 +1421,7 @@ def execute_single_native_tool_call(
|
||||
args_dict = func_args
|
||||
|
||||
agent_key = getattr(agent, "key", "unknown") if agent else "unknown"
|
||||
scope_id = artifact_scope_id(crew, task, agent)
|
||||
|
||||
original_tool: BaseTool | None = None
|
||||
for tool in original_tools:
|
||||
@@ -1430,6 +1436,7 @@ def execute_single_native_tool_call(
|
||||
if tools_handler and tools_handler.cache:
|
||||
cached_result = tools_handler.cache.read(tool=func_name, input=input_str)
|
||||
if cached_result is not None:
|
||||
cached_result = store_if_artifact(cached_result, scope_id)
|
||||
result = (
|
||||
str(cached_result)
|
||||
if not isinstance(cached_result, str)
|
||||
@@ -1481,7 +1488,8 @@ def execute_single_native_tool_call(
|
||||
if func_name in available_functions:
|
||||
try:
|
||||
tool_func = available_functions[func_name]
|
||||
raw_result = tool_func(**args_dict)
|
||||
invoke_args = resolve_artifact_handles(args_dict) if args_dict else {}
|
||||
raw_result = tool_func(**invoke_args)
|
||||
|
||||
if tools_handler and tools_handler.cache:
|
||||
should_cache = True
|
||||
@@ -1494,6 +1502,7 @@ def execute_single_native_tool_call(
|
||||
tool=func_name, input=input_str, output=raw_result
|
||||
)
|
||||
|
||||
raw_result = store_if_artifact(raw_result, scope_id)
|
||||
result = (
|
||||
str(raw_result) if not isinstance(raw_result, str) else raw_result
|
||||
)
|
||||
@@ -1532,6 +1541,10 @@ def execute_single_native_tool_call(
|
||||
except Exception: # noqa: S110
|
||||
pass
|
||||
|
||||
# An after_tool_call hook may have replaced the result with a FileArtifact;
|
||||
# keep those bytes out of the message and events too.
|
||||
result = store_if_artifact(result, scope_id)
|
||||
|
||||
if not error_event_emitted:
|
||||
crewai_event_bus.emit(
|
||||
event_source,
|
||||
|
||||
397
lib/crewai/tests/tools/test_file_artifact.py
Normal file
397
lib/crewai/tests/tools/test_file_artifact.py
Normal file
@@ -0,0 +1,397 @@
|
||||
"""Tests for out-of-band binary file passing between tools."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import re
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.tools import FileArtifact
|
||||
from crewai.tools.file_artifact import (
|
||||
_store,
|
||||
artifact_scope_id,
|
||||
clear_artifact_scope,
|
||||
resolve_artifact_handles,
|
||||
store_artifact,
|
||||
store_if_artifact,
|
||||
)
|
||||
|
||||
|
||||
_HANDLE = re.compile(r"crewai\+file://[0-9a-fA-F-]{36}")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clear_store():
|
||||
"""Keep the process-local store empty between tests."""
|
||||
_store._entries.clear()
|
||||
_store._handle_by_obj.clear()
|
||||
yield
|
||||
_store._entries.clear()
|
||||
_store._handle_by_obj.clear()
|
||||
|
||||
|
||||
def _handle_in(text: str) -> str:
|
||||
match = _HANDLE.search(text)
|
||||
assert match is not None, f"no handle in: {text!r}"
|
||||
return match.group(0)
|
||||
|
||||
|
||||
class TestFileArtifact:
|
||||
def test_as_base64_round_trips(self) -> None:
|
||||
data = bytes(range(256))
|
||||
artifact = FileArtifact(data=data, filename="x.bin")
|
||||
assert base64.b64decode(artifact.as_base64()) == data
|
||||
|
||||
def test_size_bytes(self) -> None:
|
||||
assert FileArtifact(data=b"abc").size_bytes == 3
|
||||
|
||||
def test_defaults(self) -> None:
|
||||
artifact = FileArtifact(data=b"")
|
||||
assert artifact.filename == "file"
|
||||
assert artifact.mime_type == "application/octet-stream"
|
||||
|
||||
|
||||
class TestStoreArtifact:
|
||||
def test_placeholder_contains_metadata_and_handle(self) -> None:
|
||||
artifact = FileArtifact(
|
||||
data=b"\x00" * 30045, filename="deck.pptx", mime_type="application/pptx"
|
||||
)
|
||||
placeholder = store_artifact(artifact, scope_id="crew-1")
|
||||
assert 'filename="deck.pptx"' in placeholder
|
||||
assert 'mime_type="application/pptx"' in placeholder
|
||||
assert "29.3 KB" in placeholder
|
||||
assert _HANDLE.search(placeholder) is not None
|
||||
|
||||
def test_each_store_gets_a_unique_handle(self) -> None:
|
||||
h1 = _handle_in(store_artifact(FileArtifact(data=b"a")))
|
||||
h2 = _handle_in(store_artifact(FileArtifact(data=b"a")))
|
||||
assert h1 != h2
|
||||
|
||||
def test_restoring_same_instance_reuses_handle(self) -> None:
|
||||
# The tool-result cache hands back the same FileArtifact on every cache
|
||||
# hit; re-storing it must reuse the handle, not stack duplicate copies.
|
||||
artifact = FileArtifact(data=b"payload" * 1000)
|
||||
h1 = _handle_in(store_artifact(artifact, scope_id="s"))
|
||||
h2 = _handle_in(store_artifact(artifact, scope_id="s"))
|
||||
assert h1 == h2
|
||||
assert len(_store._entries) == 1
|
||||
|
||||
def test_same_instance_different_scope_gets_own_handle_and_cleans_up(self) -> None:
|
||||
# Storing one instance under two scopes must not orphan a mapping:
|
||||
# each scope keeps its own handle, and clearing one leaves the other.
|
||||
artifact = FileArtifact(data=b"x" * 100)
|
||||
h_a = _handle_in(store_artifact(artifact, scope_id="A"))
|
||||
h_b = _handle_in(store_artifact(artifact, scope_id="B"))
|
||||
assert h_a != h_b
|
||||
clear_artifact_scope("A")
|
||||
assert resolve_artifact_handles(h_a) == h_a # A cleared
|
||||
assert base64.b64decode(resolve_artifact_handles(h_b)) == b"x" * 100
|
||||
# No dangling object-identity mapping for the cleared scope.
|
||||
assert (id(artifact), "A") not in _store._handle_by_obj
|
||||
clear_artifact_scope("B")
|
||||
assert _store._handle_by_obj == {}
|
||||
|
||||
def test_placeholder_escapes_quotes_in_metadata(self) -> None:
|
||||
artifact = FileArtifact(data=b"x", filename='a".pptx', mime_type='m"/x')
|
||||
placeholder = store_artifact(artifact)
|
||||
# The bracketed attribute list must not be broken by an embedded quote,
|
||||
# and the handle must still be recoverable.
|
||||
assert 'filename="a\'.pptx"' in placeholder
|
||||
assert _HANDLE.search(placeholder) is not None
|
||||
|
||||
def test_placeholder_neutralizes_bracket_and_newlines(self) -> None:
|
||||
artifact = FileArtifact(data=b"x", filename="a]b\nc.bin")
|
||||
placeholder = store_artifact(artifact)
|
||||
first_line = placeholder.splitlines()[0]
|
||||
# The closing bracket and newline can't appear inside the attributes,
|
||||
# so the bracketed segment stays a single, well-formed line.
|
||||
assert first_line.count("]") == 1 and first_line.endswith("]")
|
||||
assert _HANDLE.search(placeholder) is not None
|
||||
|
||||
|
||||
class TestArtifactScopeId:
|
||||
class _Obj:
|
||||
def __init__(self, id_):
|
||||
self.id = id_
|
||||
|
||||
def test_prefers_crew_id(self) -> None:
|
||||
assert artifact_scope_id(self._Obj("crew"), self._Obj("task")) == "crew"
|
||||
|
||||
def test_falls_back_to_task_when_no_crew(self) -> None:
|
||||
assert artifact_scope_id(None, self._Obj("task")) == "task"
|
||||
|
||||
def test_falls_back_to_task_when_crew_id_is_none(self) -> None:
|
||||
assert artifact_scope_id(self._Obj(None), self._Obj("task")) == "task"
|
||||
|
||||
def test_none_when_neither_present(self) -> None:
|
||||
assert artifact_scope_id(None, None) is None
|
||||
|
||||
def test_falls_back_to_agent_crew(self) -> None:
|
||||
# Native executors may have crew=None while the agent carries the crew;
|
||||
# the helper must still resolve the crew id so cleanup scopes align.
|
||||
agent = self._Obj(None)
|
||||
agent.crew = self._Obj("crew-from-agent")
|
||||
assert artifact_scope_id(None, self._Obj("task"), agent) == "crew-from-agent"
|
||||
|
||||
def test_explicit_crew_beats_agent_crew(self) -> None:
|
||||
agent = self._Obj(None)
|
||||
agent.crew = self._Obj("agent-crew")
|
||||
assert artifact_scope_id(self._Obj("direct-crew"), None, agent) == "direct-crew"
|
||||
|
||||
|
||||
class TestResolveArtifactHandles:
|
||||
def test_exact_handle_resolves_to_base64(self) -> None:
|
||||
data = bytes(range(256)) * 100
|
||||
handle = _handle_in(store_artifact(FileArtifact(data=data)))
|
||||
resolved = resolve_artifact_handles(handle)
|
||||
assert base64.b64decode(resolved) == data
|
||||
|
||||
def test_resolves_handle_with_uppercased_hex(self) -> None:
|
||||
# A model may echo the handle with uppercase uuid hex; lookup must still
|
||||
# hit the lowercase-keyed store.
|
||||
data = b"upper-case-payload" * 100
|
||||
handle = _handle_in(store_artifact(FileArtifact(data=data)))
|
||||
scheme, _, hex_part = handle.rpartition("/")
|
||||
upper = f"{scheme}/{hex_part.upper()}"
|
||||
assert upper != handle
|
||||
assert base64.b64decode(resolve_artifact_handles(upper)) == data
|
||||
|
||||
def test_resolves_handle_inside_dict(self) -> None:
|
||||
data = b"binary-payload" * 1000
|
||||
handle = _handle_in(store_artifact(FileArtifact(data=data)))
|
||||
args = {"file_name": "a.bin", "content": handle}
|
||||
resolved = resolve_artifact_handles(args)
|
||||
assert base64.b64decode(resolved["content"]) == data
|
||||
assert resolved["file_name"] == "a.bin"
|
||||
|
||||
def test_resolves_handle_nested_in_list_and_dict(self) -> None:
|
||||
handle = _handle_in(store_artifact(FileArtifact(data=b"xyz")))
|
||||
resolved = resolve_artifact_handles({"items": [{"c": handle}]})
|
||||
assert base64.b64decode(resolved["items"][0]["c"]) == b"xyz"
|
||||
|
||||
def test_does_not_mutate_original_arguments(self) -> None:
|
||||
handle = _handle_in(store_artifact(FileArtifact(data=b"data")))
|
||||
args = {"content": handle}
|
||||
resolve_artifact_handles(args)
|
||||
assert args["content"] == handle
|
||||
|
||||
def test_unknown_handle_is_left_unchanged(self) -> None:
|
||||
token = "crewai+file://00000000-0000-0000-0000-000000000000"
|
||||
assert resolve_artifact_handles(token) == token
|
||||
|
||||
def test_non_handle_strings_pass_through(self) -> None:
|
||||
assert resolve_artifact_handles("just text") == "just text"
|
||||
assert resolve_artifact_handles({"k": "v"}) == {"k": "v"}
|
||||
|
||||
def test_non_string_values_pass_through(self) -> None:
|
||||
assert resolve_artifact_handles(42) == 42
|
||||
assert resolve_artifact_handles(None) is None
|
||||
assert resolve_artifact_handles([1, 2]) == [1, 2]
|
||||
|
||||
|
||||
class TestStoreIfArtifact:
|
||||
def test_artifact_becomes_placeholder(self) -> None:
|
||||
result = store_if_artifact(FileArtifact(data=b"a" * 100), scope_id="s")
|
||||
assert isinstance(result, str)
|
||||
assert _HANDLE.search(result) is not None
|
||||
|
||||
def test_other_values_unchanged(self) -> None:
|
||||
assert store_if_artifact("hello") == "hello"
|
||||
assert store_if_artifact(7) == 7
|
||||
|
||||
|
||||
class TestScoping:
|
||||
def test_clear_scope_only_drops_its_own_artifacts(self) -> None:
|
||||
h_a = _handle_in(store_artifact(FileArtifact(data=b"a"), scope_id="A"))
|
||||
h_b = _handle_in(store_artifact(FileArtifact(data=b"b"), scope_id="B"))
|
||||
|
||||
clear_artifact_scope("A")
|
||||
|
||||
# A's handle no longer resolves; B's still does.
|
||||
assert resolve_artifact_handles(h_a) == h_a
|
||||
assert base64.b64decode(resolve_artifact_handles(h_b)) == b"b"
|
||||
|
||||
def test_unscoped_artifact_survives_other_scope_clears(self) -> None:
|
||||
handle = _handle_in(store_artifact(FileArtifact(data=b"x")))
|
||||
clear_artifact_scope("some-crew")
|
||||
assert base64.b64decode(resolve_artifact_handles(handle)) == b"x"
|
||||
|
||||
|
||||
def _legacy_executor_runner(tools):
|
||||
"""Return a `(func_name, args) -> result_dict` driver for the legacy executor."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||
from crewai.tools.base_tool import to_langchain
|
||||
from crewai.utilities.agent_utils import convert_tools_to_openai_schema
|
||||
|
||||
executor = CrewAgentExecutor(tools=to_langchain(tools), original_tools=tools)
|
||||
agent = Mock(key="agent", role="tester", verbose=False, fingerprint=None)
|
||||
agent.tools_results = []
|
||||
executor.agent = agent
|
||||
task = Mock(description="t", id="scope-legacy")
|
||||
task.name = "t" # `name=` is a reserved Mock ctor kwarg, so assign explicitly
|
||||
executor.task = task
|
||||
_, available_functions, _ = convert_tools_to_openai_schema(tools)
|
||||
|
||||
def run(func_name, args):
|
||||
return executor._execute_single_native_tool_call(
|
||||
call_id="c",
|
||||
func_name=func_name,
|
||||
func_args=args,
|
||||
available_functions=available_functions,
|
||||
)
|
||||
|
||||
return run
|
||||
|
||||
|
||||
def _experimental_executor_runner(tools):
|
||||
"""Return a `(func_name, args) -> result_dict` driver for the default executor."""
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import Mock
|
||||
|
||||
from crewai.experimental.agent_executor import AgentExecutor
|
||||
|
||||
executor = AgentExecutor.model_construct()
|
||||
for key, value in {
|
||||
"original_tools": tools,
|
||||
"tools": [],
|
||||
"tools_handler": None,
|
||||
"crew": None,
|
||||
}.items():
|
||||
object.__setattr__(executor, key, value)
|
||||
agent = Mock(key="agent", role="tester", verbose=False, fingerprint=None)
|
||||
agent.tools_results = []
|
||||
object.__setattr__(executor, "agent", agent)
|
||||
task = Mock(id="scope-exp", description="t")
|
||||
task.name = "t" # `name=` is a reserved Mock ctor kwarg, so assign explicitly
|
||||
object.__setattr__(executor, "task", task)
|
||||
executor._setup_native_tools()
|
||||
|
||||
def run(func_name, args):
|
||||
tool_call = SimpleNamespace(
|
||||
id="c",
|
||||
function=SimpleNamespace(
|
||||
name=func_name, arguments=args if isinstance(args, str) else json.dumps(args)
|
||||
),
|
||||
)
|
||||
return executor._execute_single_native_tool_call(tool_call)
|
||||
|
||||
return run
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"make_runner",
|
||||
[_experimental_executor_runner, _legacy_executor_runner],
|
||||
ids=["experimental", "legacy"],
|
||||
)
|
||||
class TestNativeExecutorWiring:
|
||||
"""Guard producer/consumer wiring on both the default and legacy executors."""
|
||||
|
||||
def test_artifact_output_is_replaced_by_handle_and_resolves_downstream(
|
||||
self, make_runner
|
||||
) -> None:
|
||||
from crewai.tools import BaseTool, FileArtifact
|
||||
|
||||
payload = bytes(range(256)) * 200 # ~51 KB, far past the LLM round-trip limit
|
||||
|
||||
class Generate(BaseTool):
|
||||
name: str = "generate_file"
|
||||
description: str = "Generate a binary file"
|
||||
|
||||
def _run(self) -> FileArtifact:
|
||||
return FileArtifact(
|
||||
data=payload, filename="deck.pptx", mime_type="application/pptx"
|
||||
)
|
||||
|
||||
captured: dict[str, str] = {}
|
||||
|
||||
class Upload(BaseTool):
|
||||
name: str = "upload_file"
|
||||
description: str = "Upload base64 content"
|
||||
|
||||
def _run(self, content: str) -> str:
|
||||
captured["content"] = content
|
||||
return "uploaded"
|
||||
|
||||
run = make_runner([Generate(), Upload()])
|
||||
|
||||
# Producer: the 51 KB payload must NOT appear in the model-facing result.
|
||||
gen_result = run("generate_file", "{}")["result"]
|
||||
assert "deck.pptx" in gen_result
|
||||
assert base64.b64encode(payload).decode() not in gen_result
|
||||
handle = _handle_in(gen_result)
|
||||
|
||||
# Consumer: the handle the model echoes is expanded to exact bytes.
|
||||
up_result = run("upload_file", {"content": handle})["result"]
|
||||
assert up_result == "uploaded"
|
||||
assert base64.b64decode(captured["content"]) == payload
|
||||
|
||||
|
||||
class TestAfterHookArtifact:
|
||||
"""An after_tool_call hook that returns a FileArtifact must still be stored."""
|
||||
|
||||
def test_hook_returned_artifact_is_replaced_by_handle(self) -> None:
|
||||
from crewai.hooks.tool_hooks import (
|
||||
register_after_tool_call_hook,
|
||||
unregister_after_tool_call_hook,
|
||||
)
|
||||
from crewai.tools import BaseTool, FileArtifact
|
||||
|
||||
payload = bytes(range(256)) * 50
|
||||
|
||||
class Echo(BaseTool):
|
||||
name: str = "echo"
|
||||
description: str = "Echo"
|
||||
|
||||
def _run(self) -> str:
|
||||
return "plain text"
|
||||
|
||||
def hook(_context):
|
||||
return FileArtifact(data=payload, filename="hook.bin")
|
||||
|
||||
register_after_tool_call_hook(hook)
|
||||
try:
|
||||
run = _experimental_executor_runner([Echo()])
|
||||
result = run("echo", "{}")["result"]
|
||||
finally:
|
||||
unregister_after_tool_call_hook(hook)
|
||||
|
||||
assert base64.b64encode(payload).decode() not in result
|
||||
assert _HANDLE.search(result) is not None
|
||||
|
||||
|
||||
class TestTtlPrune:
|
||||
@staticmethod
|
||||
def _expire(handle: str) -> None:
|
||||
"""Force a stored handle's per-entry TTL into the past."""
|
||||
entry = _store._entries[handle.rsplit("/", 1)[-1]]
|
||||
entry.expires_at = time.monotonic() - 1
|
||||
|
||||
def test_expired_handle_does_not_resolve(self) -> None:
|
||||
handle = _handle_in(store_artifact(FileArtifact(data=b"old"), ttl=3600))
|
||||
self._expire(handle)
|
||||
# An expired handle is enforced on lookup, not just on the next write.
|
||||
assert resolve_artifact_handles(handle) == handle
|
||||
|
||||
def test_short_ttl_store_does_not_evict_long_ttl_entries(self) -> None:
|
||||
keep = _handle_in(store_artifact(FileArtifact(data=b"keep"), ttl=3600))
|
||||
# A later short-TTL store must prune only by each entry's own expiry,
|
||||
# never by the current call's ttl.
|
||||
store_artifact(FileArtifact(data=b"tiny"), ttl=1)
|
||||
assert base64.b64decode(resolve_artifact_handles(keep)) == b"keep"
|
||||
|
||||
def test_expired_entries_are_pruned_on_next_store(self) -> None:
|
||||
stale = _handle_in(store_artifact(FileArtifact(data=b"old"), ttl=3600))
|
||||
self._expire(stale)
|
||||
store_artifact(FileArtifact(data=b"new"), ttl=3600)
|
||||
assert stale.rsplit("/", 1)[-1] not in _store._entries
|
||||
|
||||
def test_ttl_zero_never_expires(self) -> None:
|
||||
handle = _handle_in(store_artifact(FileArtifact(data=b"keep"), ttl=0))
|
||||
assert _store._entries[handle.rsplit("/", 1)[-1]].expires_at is None
|
||||
store_artifact(FileArtifact(data=b"another"), ttl=0)
|
||||
assert base64.b64decode(resolve_artifact_handles(handle)) == b"keep"
|
||||
Reference in New Issue
Block a user