Compare commits

...

4 Commits

Author SHA1 Message Date
Lucas Gomide
216332424e feat: add binary streaming support for large file downloads
This change enables CrewAI platform tools to handle binary file responses
from the OAuth service, allowing agents to work with large files (Excel,
Word, PDF, etc.) without memory issues
2026-02-03 11:20:04 -03:00
Greyson LaLonde
dad26680fc Merge branch 'main' into gl/fix/apps-strict-tool-calls 2026-02-02 14:57:27 -05:00
Greyson LaLonde
61d26924af fix: ensure nested items have required properties 2026-02-02 14:50:08 -05:00
Greyson LaLonde
712ac0589a fix: enforce additionalProperties=false in schemas 2026-02-02 14:18:59 -05:00
11 changed files with 342 additions and 14 deletions

View File

@@ -13,10 +13,16 @@ from crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder impor
from crewai_tools.tools.crewai_platform_tools.crewai_platform_tools import (
CrewaiPlatformTools,
)
from crewai_tools.tools.crewai_platform_tools.file_hook import (
process_file_markers,
register_file_processing_hook,
)
__all__ = [
"CrewAIPlatformActionTool",
"CrewaiPlatformToolBuilder",
"CrewaiPlatformTools",
"process_file_markers",
"register_file_processing_hook",
]

View File

@@ -2,6 +2,8 @@
import json
import os
import re
import tempfile
from typing import Any
from crewai.tools import BaseTool
@@ -14,6 +16,26 @@ from crewai_tools.tools.crewai_platform_tools.misc import (
get_platform_integration_token,
)
_FILE_MARKER_PREFIX = "__CREWAI_FILE__"
_MIME_TO_EXTENSION = {
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
"application/vnd.ms-excel": ".xls",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
"application/vnd.ms-powerpoint": ".ppt",
"application/pdf": ".pdf",
"image/png": ".png",
"image/jpeg": ".jpg",
"image/gif": ".gif",
"image/webp": ".webp",
"text/plain": ".txt",
"text/csv": ".csv",
"application/json": ".json",
"application/zip": ".zip",
}
class CrewAIPlatformActionTool(BaseTool):
action_name: str = Field(default="", description="The name of the action")
@@ -71,10 +93,18 @@ class CrewAIPlatformActionTool(BaseTool):
url=api_url,
headers=headers,
json=payload,
timeout=60,
timeout=300,
stream=True,
verify=os.environ.get("CREWAI_FACTORY", "false").lower() != "true",
)
content_type = response.headers.get("Content-Type", "")
# Check if response is binary (non-JSON)
if "application/json" not in content_type:
return self._handle_binary_response(response)
# Normal JSON response
data = response.json()
if not response.ok:
if isinstance(data, dict):
@@ -91,3 +121,49 @@ class CrewAIPlatformActionTool(BaseTool):
except Exception as e:
return f"Error executing action {self.action_name}: {e!s}"
def _handle_binary_response(self, response: requests.Response) -> str:
"""Handle binary streaming response from the API.
Streams the binary content to a temporary file and returns a marker
that can be processed by the file hook to inject the file into the
LLM context.
Args:
response: The streaming HTTP response with binary content.
Returns:
A file marker string in the format:
__CREWAI_FILE__:filename:content_type:file_path
"""
content_type = response.headers.get("Content-Type", "application/octet-stream")
filename = self._extract_filename_from_headers(response.headers)
extension = self._get_file_extension(content_type, filename)
with tempfile.NamedTemporaryFile(
delete=False, suffix=extension, prefix="crewai_"
) as tmp_file:
for chunk in response.iter_content(chunk_size=8192):
tmp_file.write(chunk)
tmp_path = tmp_file.name
return f"{_FILE_MARKER_PREFIX}:{filename}:{content_type}:{tmp_path}"
def _extract_filename_from_headers(
self, headers: requests.structures.CaseInsensitiveDict
) -> str:
content_disposition = headers.get("Content-Disposition", "")
if content_disposition:
match = re.search(r'filename="?([^";\s]+)"?', content_disposition)
if match:
return match.group(1)
return "downloaded_file"
def _get_file_extension(self, content_type: str, filename: str) -> str:
if "." in filename:
return "." + filename.rsplit(".", 1)[-1]
base_content_type = content_type.split(";")[0].strip()
return _MIME_TO_EXTENSION.get(base_content_type, "")

View File

@@ -6,6 +6,9 @@ from crewai_tools.adapters.tool_collection import ToolCollection
from crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder import (
CrewaiPlatformToolBuilder,
)
from crewai_tools.tools.crewai_platform_tools.file_hook import (
register_file_processing_hook,
)
logger = logging.getLogger(__name__)
@@ -22,6 +25,8 @@ def CrewaiPlatformTools( # noqa: N802
Returns:
A list of BaseTool instances for platform actions
"""
register_file_processing_hook()
builder = CrewaiPlatformToolBuilder(apps=apps)
return builder.tools() # type: ignore

View File

@@ -0,0 +1,132 @@
"""File processing hook for CrewAI Platform Tools.
This module provides a hook that processes file markers returned by platform tools
and injects the files into the LLM context for native file handling.
"""
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from crewai.hooks.tool_hooks import ToolCallHookContext
logger = logging.getLogger(__name__)
_FILE_MARKER_PREFIX = "__CREWAI_FILE__"
_hook_registered = False
def process_file_markers(context: ToolCallHookContext) -> str | None:
"""Process file markers in tool results and inject files into context.
This hook detects file markers returned by platform tools (e.g., download_file)
and converts them into FileInput objects that are attached to the hook context.
The agent executor will then inject these files into the tool message for
native LLM file handling.
The marker format is:
__CREWAI_FILE__:filename:content_type:file_path
Args:
context: The tool call hook context containing the tool result.
Returns:
A human-readable message if a file was processed, None otherwise.
"""
result = context.tool_result
if not result or not result.startswith(_FILE_MARKER_PREFIX):
return None
try:
parts = result.split(":", 3)
if len(parts) < 4:
logger.warning(f"Invalid file marker format: {result[:100]}")
return None
_, filename, content_type, file_path = parts
if not os.path.isfile(file_path):
logger.error(f"File not found: {file_path}")
return f"Error: Downloaded file not found at {file_path}"
try:
from crewai_files import File
except ImportError:
logger.warning(
"crewai_files not installed. File will not be attached to LLM context."
)
return (
f"Downloaded file: {filename} ({content_type}). "
f"File saved at: {file_path}. "
"Note: Install crewai_files for native LLM file handling."
)
file = File(source=file_path, content_type=content_type, filename=filename)
context.files = {filename: file}
file_size = os.path.getsize(file_path)
size_str = _format_file_size(file_size)
return f"Downloaded file: {filename} ({content_type}, {size_str}). File is attached for LLM analysis."
except Exception as e:
logger.exception(f"Error processing file marker: {e}")
return f"Error processing downloaded file: {e}"
def _format_file_size(size_bytes: int) -> str:
"""Format file size in human-readable format.
Args:
size_bytes: Size in bytes.
Returns:
Human-readable size string.
"""
if size_bytes < 1024:
return f"{size_bytes} bytes"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"
def register_file_processing_hook() -> bool:
"""Register the file processing hook globally.
This function should be called once during application initialization
to enable automatic file injection for platform tools.
Returns:
True if the hook was registered, False if it was already registered
or if registration failed.
"""
global _hook_registered
if _hook_registered:
logger.debug("File processing hook already registered")
return False
try:
from crewai.hooks import register_after_tool_call_hook
register_after_tool_call_hook(process_file_markers)
_hook_registered = True
logger.info("File processing hook registered successfully")
return True
except ImportError:
logger.warning(
"crewai.hooks not available. File processing hook not registered."
)
return False
except Exception as e:
logger.exception(f"Failed to register file processing hook: {e}")
return False

View File

@@ -2,6 +2,7 @@ import unittest
from unittest.mock import Mock, patch
from crewai_tools.tools.crewai_platform_tools import CrewaiPlatformTools
from crewai_tools.tools.crewai_platform_tools import file_hook
class TestCrewaiPlatformTools(unittest.TestCase):
@@ -113,3 +114,64 @@ class TestCrewaiPlatformTools(unittest.TestCase):
with self.assertRaises(ValueError) as context:
CrewaiPlatformTools(apps=["github"])
assert "No platform integration token found" in str(context.exception)
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token"})
@patch(
"crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder.requests.get"
)
@patch(
"crewai_tools.tools.crewai_platform_tools.crewai_platform_tools.register_file_processing_hook"
)
def test_crewai_platform_tools_registers_file_hook(
self, mock_register_hook, mock_get
):
mock_response = Mock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"actions": {"github": []}}
mock_get.return_value = mock_response
CrewaiPlatformTools(apps=["github"])
mock_register_hook.assert_called_once()
class TestFileHook(unittest.TestCase):
def setUp(self):
file_hook._hook_registered = False
def tearDown(self):
file_hook._hook_registered = False
@patch("crewai.hooks.register_after_tool_call_hook")
def test_register_hook_is_idempotent(self, mock_register):
"""Test hook registration succeeds once and is idempotent."""
assert file_hook.register_file_processing_hook() is True
assert file_hook._hook_registered is True
mock_register.assert_called_once_with(file_hook.process_file_markers)
# Second call should return False and not register again
assert file_hook.register_file_processing_hook() is False
mock_register.assert_called_once()
def test_process_file_markers_ignores_non_file_results(self):
"""Test that non-file-marker results return None."""
test_cases = [
None, # Empty result
"Regular tool output", # Non-marker
"__CREWAI_FILE__:incomplete", # Invalid format (missing parts)
]
for tool_result in test_cases:
mock_context = Mock()
mock_context.tool_result = tool_result
assert file_hook.process_file_markers(mock_context) is None
def test_format_file_size(self):
"""Test file size formatting across units."""
cases = [
(500, "500 bytes"),
(1024, "1.0 KB"),
(1536, "1.5 KB"),
(1024 * 1024, "1.0 MB"),
(1024 * 1024 * 1024, "1.0 GB"),
]
for size_bytes, expected in cases:
assert file_hook._format_file_size(size_bytes) == expected

View File

@@ -16,6 +16,7 @@ from crewai.agents.agent_adapters.openai_agents.protocols import (
)
from crewai.tools import BaseTool
from crewai.utilities.import_utils import require
from crewai.utilities.pydantic_schema_utils import force_additional_properties_false
from crewai.utilities.string_utils import sanitize_tool_name
@@ -135,7 +136,9 @@ class OpenAIAgentToolAdapter(BaseToolAdapter):
for tool in tools:
schema: dict[str, Any] = tool.args_schema.model_json_schema()
schema.update({"additionalProperties": False, "type": "object"})
schema = force_additional_properties_false(schema)
schema.update({"type": "object"})
openai_tool: OpenAIFunctionTool = cast(
OpenAIFunctionTool,

View File

@@ -930,6 +930,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
"name": func_name,
"content": result,
}
if after_hook_context.files:
tool_message["files"] = after_hook_context.files
self.messages.append(tool_message)
# Log the tool execution

View File

@@ -814,6 +814,10 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"name": func_name,
"content": result,
}
if after_hook_context.files:
tool_message["files"] = after_hook_context.files
self.state.messages.append(tool_message)
# Log the tool execution

View File

@@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, Any
from crewai.events.event_listener import event_listener
from crewai.hooks.types import AfterToolCallHookType, BeforeToolCallHookType
from crewai.utilities.printer import Printer
from crewai.utilities.types import FileInput
if TYPE_CHECKING:
@@ -34,6 +35,9 @@ class ToolCallHookContext:
crew: Crew instance (may be None)
tool_result: Tool execution result (only set for after_tool_call hooks).
Can be modified by returning a new string from after_tool_call hook.
files: Optional dictionary of files to attach to the tool message.
Can be set by after_tool_call hooks to inject files into the LLM context.
These files will be formatted according to the LLM provider's requirements.
"""
def __init__(
@@ -64,6 +68,7 @@ class ToolCallHookContext:
self.task = task
self.crew = crew
self.tool_result = tool_result
self.files: dict[str, FileInput] | None = None
def request_human_input(
self,

View File

@@ -1521,13 +1521,16 @@ class OpenAICompletion(BaseLLM):
) -> list[dict[str, Any]]:
"""Convert CrewAI tool format to OpenAI function calling format."""
from crewai.llms.providers.utils.common import safe_tool_conversion
from crewai.utilities.pydantic_schema_utils import (
force_additional_properties_false,
)
openai_tools = []
for tool in tools:
name, description, parameters = safe_tool_conversion(tool, "OpenAI")
openai_tool = {
openai_tool: dict[str, Any] = {
"type": "function",
"function": {
"name": name,
@@ -1537,10 +1540,11 @@ class OpenAICompletion(BaseLLM):
}
if parameters:
if isinstance(parameters, dict):
openai_tool["function"]["parameters"] = parameters # type: ignore
else:
openai_tool["function"]["parameters"] = dict(parameters)
params_dict = (
parameters if isinstance(parameters, dict) else dict(parameters)
)
params_dict = force_additional_properties_false(params_dict)
openai_tool["function"]["parameters"] = params_dict
openai_tools.append(openai_tool)
return openai_tools

View File

@@ -127,6 +127,36 @@ def add_key_in_dict_recursively(
return d
def force_additional_properties_false(d: Any) -> Any:
"""Force additionalProperties=false on all object-type dicts recursively.
OpenAI strict mode requires all objects to have additionalProperties=false.
This function overwrites any existing value to ensure compliance.
Also ensures objects have properties and required arrays, even if empty,
as OpenAI strict mode requires these for all object types.
Args:
d: The dictionary/list to modify.
Returns:
The modified dictionary/list.
"""
if isinstance(d, dict):
if d.get("type") == "object":
d["additionalProperties"] = False
if "properties" not in d:
d["properties"] = {}
if "required" not in d:
d["required"] = []
for v in d.values():
force_additional_properties_false(v)
elif isinstance(d, list):
for i in d:
force_additional_properties_false(i)
return d
def fix_discriminator_mappings(schema: dict[str, Any]) -> dict[str, Any]:
"""Replace '#/$defs/...' references in discriminator.mapping with just the model name.
@@ -278,13 +308,7 @@ def generate_model_description(model: type[BaseModel]) -> dict[str, Any]:
"""
json_schema = model.model_json_schema(ref_template="#/$defs/{model}")
json_schema = add_key_in_dict_recursively(
json_schema,
key="additionalProperties",
value=False,
criteria=lambda d: d.get("type") == "object"
and "additionalProperties" not in d,
)
json_schema = force_additional_properties_false(json_schema)
json_schema = resolve_refs(json_schema)
@@ -378,6 +402,9 @@ def create_model_from_schema( # type: ignore[no-any-unimported]
"""
effective_root = root_schema or json_schema
json_schema = force_additional_properties_false(json_schema)
effective_root = force_additional_properties_false(effective_root)
if "allOf" in json_schema:
json_schema = _merge_all_of_schemas(json_schema["allOf"], effective_root)
if "title" not in json_schema and "title" in (root_schema or {}):