Compare commits

...

1 Commits

Author SHA1 Message Date
Lucas Gomide
0dda6641b2 feat: add binary streaming support for large file downloads
This change enables CrewAI platform tools to handle binary file responses
from the OAuth service, allowing agents to work with large files (Excel,
Word, PDF, etc.) without memory issues
2026-01-30 01:00:38 -03:00
6 changed files with 256 additions and 1 deletions

View File

@@ -13,10 +13,18 @@ from crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder impor
from crewai_tools.tools.crewai_platform_tools.crewai_platform_tools import (
CrewaiPlatformTools,
)
from crewai_tools.tools.crewai_platform_tools.file_hook import (
process_file_markers,
register_file_processing_hook,
unregister_file_processing_hook,
)
__all__ = [
"CrewAIPlatformActionTool",
"CrewaiPlatformToolBuilder",
"CrewaiPlatformTools",
"process_file_markers",
"register_file_processing_hook",
"unregister_file_processing_hook",
]

View File

@@ -2,6 +2,8 @@
import json
import os
import re
import tempfile
from typing import Any
from crewai.tools import BaseTool
@@ -14,6 +16,26 @@ from crewai_tools.tools.crewai_platform_tools.misc import (
get_platform_integration_token,
)
_FILE_MARKER_PREFIX = "__CREWAI_FILE__"
_MIME_TO_EXTENSION = {
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
"application/vnd.ms-excel": ".xls",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
"application/vnd.ms-powerpoint": ".ppt",
"application/pdf": ".pdf",
"image/png": ".png",
"image/jpeg": ".jpg",
"image/gif": ".gif",
"image/webp": ".webp",
"text/plain": ".txt",
"text/csv": ".csv",
"application/json": ".json",
"application/zip": ".zip",
}
class CrewAIPlatformActionTool(BaseTool):
action_name: str = Field(default="", description="The name of the action")
@@ -71,10 +93,18 @@ class CrewAIPlatformActionTool(BaseTool):
url=api_url,
headers=headers,
json=payload,
timeout=60,
timeout=300,
stream=True,
verify=os.environ.get("CREWAI_FACTORY", "false").lower() != "true",
)
content_type = response.headers.get("Content-Type", "")
# Check if response is binary (non-JSON)
if "application/json" not in content_type:
return self._handle_binary_response(response)
# Normal JSON response
data = response.json()
if not response.ok:
if isinstance(data, dict):
@@ -91,3 +121,49 @@ class CrewAIPlatformActionTool(BaseTool):
except Exception as e:
return f"Error executing action {self.action_name}: {e!s}"
def _handle_binary_response(self, response: requests.Response) -> str:
"""Handle binary streaming response from the API.
Streams the binary content to a temporary file and returns a marker
that can be processed by the file hook to inject the file into the
LLM context.
Args:
response: The streaming HTTP response with binary content.
Returns:
A file marker string in the format:
__CREWAI_FILE__:filename:content_type:file_path
"""
content_type = response.headers.get("Content-Type", "application/octet-stream")
filename = self._extract_filename_from_headers(response.headers)
extension = self._get_file_extension(content_type, filename)
with tempfile.NamedTemporaryFile(
delete=False, suffix=extension, prefix="crewai_"
) as tmp_file:
for chunk in response.iter_content(chunk_size=8192):
tmp_file.write(chunk)
tmp_path = tmp_file.name
return f"{_FILE_MARKER_PREFIX}:{filename}:{content_type}:{tmp_path}"
def _extract_filename_from_headers(
self, headers: requests.structures.CaseInsensitiveDict
) -> str:
content_disposition = headers.get("Content-Disposition", "")
if content_disposition:
match = re.search(r'filename="?([^";\s]+)"?', content_disposition)
if match:
return match.group(1)
return "downloaded_file"
def _get_file_extension(self, content_type: str, filename: str) -> str:
if "." in filename:
return "." + filename.rsplit(".", 1)[-1]
base_content_type = content_type.split(";")[0].strip()
return _MIME_TO_EXTENSION.get(base_content_type, "")

View File

@@ -0,0 +1,158 @@
"""File processing hook for CrewAI Platform Tools.
This module provides a hook that processes file markers returned by platform tools
and injects the files into the LLM context for native file handling.
"""
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from crewai.hooks.tool_hooks import ToolCallHookContext
logger = logging.getLogger(__name__)
_FILE_MARKER_PREFIX = "__CREWAI_FILE__"
_hook_registered = False
def process_file_markers(context: ToolCallHookContext) -> str | None:
"""Process file markers in tool results and inject files into context.
This hook detects file markers returned by platform tools (e.g., download_file)
and converts them into FileInput objects that are attached to the hook context.
The agent executor will then inject these files into the tool message for
native LLM file handling.
The marker format is:
__CREWAI_FILE__:filename:content_type:file_path
Args:
context: The tool call hook context containing the tool result.
Returns:
A human-readable message if a file was processed, None otherwise.
"""
result = context.tool_result
if not result or not result.startswith(_FILE_MARKER_PREFIX):
return None
try:
parts = result.split(":", 3)
if len(parts) < 4:
logger.warning(f"Invalid file marker format: {result[:100]}")
return None
_, filename, content_type, file_path = parts
if not os.path.isfile(file_path):
logger.error(f"File not found: {file_path}")
return f"Error: Downloaded file not found at {file_path}"
try:
from crewai_files import File
except ImportError:
logger.warning(
"crewai_files not installed. File will not be attached to LLM context."
)
return (
f"Downloaded file: {filename} ({content_type}). "
f"File saved at: {file_path}. "
"Note: Install crewai_files for native LLM file handling."
)
file = File(source=file_path, content_type=content_type, filename=filename)
context.files = {filename: file}
file_size = os.path.getsize(file_path)
size_str = _format_file_size(file_size)
return f"Downloaded file: {filename} ({content_type}, {size_str}). File is attached for LLM analysis."
except Exception as e:
logger.exception(f"Error processing file marker: {e}")
return f"Error processing downloaded file: {e}"
def _format_file_size(size_bytes: int) -> str:
"""Format file size in human-readable format.
Args:
size_bytes: Size in bytes.
Returns:
Human-readable size string.
"""
if size_bytes < 1024:
return f"{size_bytes} bytes"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"
def register_file_processing_hook() -> bool:
"""Register the file processing hook globally.
This function should be called once during application initialization
to enable automatic file injection for platform tools.
Returns:
True if the hook was registered, False if it was already registered
or if registration failed.
"""
global _hook_registered
if _hook_registered:
logger.debug("File processing hook already registered")
return False
try:
from crewai.hooks import register_after_tool_call_hook
register_after_tool_call_hook(process_file_markers)
_hook_registered = True
logger.info("File processing hook registered successfully")
return True
except ImportError:
logger.warning(
"crewai.hooks not available. File processing hook not registered."
)
return False
except Exception as e:
logger.exception(f"Failed to register file processing hook: {e}")
return False
def unregister_file_processing_hook() -> bool:
"""Unregister the file processing hook.
Returns:
True if the hook was unregistered, False if it wasn't registered
or if unregistration failed.
"""
global _hook_registered
if not _hook_registered:
return False
try:
from crewai.hooks import unregister_after_tool_call_hook
unregister_after_tool_call_hook(process_file_markers)
_hook_registered = False
logger.info("File processing hook unregistered")
return True
except ImportError:
return False
except Exception as e:
logger.exception(f"Failed to unregister file processing hook: {e}")
return False

View File

@@ -930,6 +930,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
"name": func_name,
"content": result,
}
if after_hook_context.files:
tool_message["files"] = after_hook_context.files
self.messages.append(tool_message)
# Log the tool execution

View File

@@ -795,6 +795,10 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"name": func_name,
"content": result,
}
if after_hook_context.files:
tool_message["files"] = after_hook_context.files
self.state.messages.append(tool_message)
# Log the tool execution

View File

@@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, Any
from crewai.events.event_listener import event_listener
from crewai.hooks.types import AfterToolCallHookType, BeforeToolCallHookType
from crewai.utilities.printer import Printer
from crewai.utilities.types import FileInput
if TYPE_CHECKING:
@@ -34,6 +35,9 @@ class ToolCallHookContext:
crew: Crew instance (may be None)
tool_result: Tool execution result (only set for after_tool_call hooks).
Can be modified by returning a new string from after_tool_call hook.
files: Optional dictionary of files to attach to the tool message.
Can be set by after_tool_call hooks to inject files into the LLM context.
These files will be formatted according to the LLM provider's requirements.
"""
def __init__(
@@ -64,6 +68,7 @@ class ToolCallHookContext:
self.task = task
self.crew = crew
self.tool_result = tool_result
self.files: dict[str, FileInput] | None = None
def request_human_input(
self,