From 216332424e1dc13788f44eaa5e40d79e735e81c5 Mon Sep 17 00:00:00 2001 From: Lucas Gomide Date: Fri, 30 Jan 2026 00:56:39 -0300 Subject: [PATCH] feat: add binary streaming support for large file downloads This change enables CrewAI platform tools to handle binary file responses from the OAuth service, allowing agents to work with large files (Excel, Word, PDF, etc.) without memory issues --- .../tools/crewai_platform_tools/__init__.py | 6 + .../crewai_platform_action_tool.py | 78 ++++++++++- .../crewai_platform_tools.py | 5 + .../tools/crewai_platform_tools/file_hook.py | 132 ++++++++++++++++++ .../test_crewai_platform_tools.py | 62 ++++++++ .../src/crewai/agents/crew_agent_executor.py | 4 + .../src/crewai/experimental/agent_executor.py | 4 + lib/crewai/src/crewai/hooks/tool_hooks.py | 5 + 8 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/file_hook.py diff --git a/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/__init__.py b/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/__init__.py index 588414e19..863da0cb5 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/__init__.py +++ b/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/__init__.py @@ -13,10 +13,16 @@ from crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder impor from crewai_tools.tools.crewai_platform_tools.crewai_platform_tools import ( CrewaiPlatformTools, ) +from crewai_tools.tools.crewai_platform_tools.file_hook import ( + process_file_markers, + register_file_processing_hook, +) __all__ = [ "CrewAIPlatformActionTool", "CrewaiPlatformToolBuilder", "CrewaiPlatformTools", + "process_file_markers", + "register_file_processing_hook", ] diff --git a/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/crewai_platform_action_tool.py b/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/crewai_platform_action_tool.py index 3a3ae3be9..bab9dceb2 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/crewai_platform_action_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/crewai_platform_action_tool.py @@ -2,6 +2,8 @@ import json import os +import re +import tempfile from typing import Any from crewai.tools import BaseTool @@ -14,6 +16,26 @@ from crewai_tools.tools.crewai_platform_tools.misc import ( get_platform_integration_token, ) +_FILE_MARKER_PREFIX = "__CREWAI_FILE__" + +_MIME_TO_EXTENSION = { + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", + "application/vnd.ms-excel": ".xls", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", + "application/msword": ".doc", + "application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx", + "application/vnd.ms-powerpoint": ".ppt", + "application/pdf": ".pdf", + "image/png": ".png", + "image/jpeg": ".jpg", + "image/gif": ".gif", + "image/webp": ".webp", + "text/plain": ".txt", + "text/csv": ".csv", + "application/json": ".json", + "application/zip": ".zip", +} + class CrewAIPlatformActionTool(BaseTool): action_name: str = Field(default="", description="The name of the action") @@ -71,10 +93,18 @@ class CrewAIPlatformActionTool(BaseTool): url=api_url, headers=headers, json=payload, - timeout=60, + timeout=300, + stream=True, verify=os.environ.get("CREWAI_FACTORY", "false").lower() != "true", ) + content_type = response.headers.get("Content-Type", "") + + # Check if response is binary (non-JSON) + if "application/json" not in content_type: + return self._handle_binary_response(response) + + # Normal JSON response data = response.json() if not response.ok: if isinstance(data, dict): @@ -91,3 +121,49 @@ class CrewAIPlatformActionTool(BaseTool): except Exception as e: return f"Error executing action {self.action_name}: {e!s}" + + def _handle_binary_response(self, response: requests.Response) -> str: + """Handle binary streaming response from the API. + + Streams the binary content to a temporary file and returns a marker + that can be processed by the file hook to inject the file into the + LLM context. + + Args: + response: The streaming HTTP response with binary content. + + Returns: + A file marker string in the format: + __CREWAI_FILE__:filename:content_type:file_path + """ + content_type = response.headers.get("Content-Type", "application/octet-stream") + + filename = self._extract_filename_from_headers(response.headers) + + extension = self._get_file_extension(content_type, filename) + + with tempfile.NamedTemporaryFile( + delete=False, suffix=extension, prefix="crewai_" + ) as tmp_file: + for chunk in response.iter_content(chunk_size=8192): + tmp_file.write(chunk) + tmp_path = tmp_file.name + + return f"{_FILE_MARKER_PREFIX}:{filename}:{content_type}:{tmp_path}" + + def _extract_filename_from_headers( + self, headers: requests.structures.CaseInsensitiveDict + ) -> str: + content_disposition = headers.get("Content-Disposition", "") + if content_disposition: + match = re.search(r'filename="?([^";\s]+)"?', content_disposition) + if match: + return match.group(1) + return "downloaded_file" + + def _get_file_extension(self, content_type: str, filename: str) -> str: + if "." in filename: + return "." + filename.rsplit(".", 1)[-1] + + base_content_type = content_type.split(";")[0].strip() + return _MIME_TO_EXTENSION.get(base_content_type, "") diff --git a/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/crewai_platform_tools.py b/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/crewai_platform_tools.py index 83016ddb8..1273c0db5 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/crewai_platform_tools.py +++ b/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/crewai_platform_tools.py @@ -6,6 +6,9 @@ from crewai_tools.adapters.tool_collection import ToolCollection from crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder import ( CrewaiPlatformToolBuilder, ) +from crewai_tools.tools.crewai_platform_tools.file_hook import ( + register_file_processing_hook, +) logger = logging.getLogger(__name__) @@ -22,6 +25,8 @@ def CrewaiPlatformTools( # noqa: N802 Returns: A list of BaseTool instances for platform actions """ + register_file_processing_hook() + builder = CrewaiPlatformToolBuilder(apps=apps) return builder.tools() # type: ignore diff --git a/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/file_hook.py b/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/file_hook.py new file mode 100644 index 000000000..3e3d19d88 --- /dev/null +++ b/lib/crewai-tools/src/crewai_tools/tools/crewai_platform_tools/file_hook.py @@ -0,0 +1,132 @@ +"""File processing hook for CrewAI Platform Tools. + +This module provides a hook that processes file markers returned by platform tools +and injects the files into the LLM context for native file handling. +""" + +from __future__ import annotations + +import logging +import os +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from crewai.hooks.tool_hooks import ToolCallHookContext + +logger = logging.getLogger(__name__) + +_FILE_MARKER_PREFIX = "__CREWAI_FILE__" + +_hook_registered = False + + +def process_file_markers(context: ToolCallHookContext) -> str | None: + """Process file markers in tool results and inject files into context. + + This hook detects file markers returned by platform tools (e.g., download_file) + and converts them into FileInput objects that are attached to the hook context. + The agent executor will then inject these files into the tool message for + native LLM file handling. + + The marker format is: + __CREWAI_FILE__:filename:content_type:file_path + + Args: + context: The tool call hook context containing the tool result. + + Returns: + A human-readable message if a file was processed, None otherwise. + """ + result = context.tool_result + + if not result or not result.startswith(_FILE_MARKER_PREFIX): + return None + + try: + parts = result.split(":", 3) + if len(parts) < 4: + logger.warning(f"Invalid file marker format: {result[:100]}") + return None + + _, filename, content_type, file_path = parts + + if not os.path.isfile(file_path): + logger.error(f"File not found: {file_path}") + return f"Error: Downloaded file not found at {file_path}" + + try: + from crewai_files import File + except ImportError: + logger.warning( + "crewai_files not installed. File will not be attached to LLM context." + ) + return ( + f"Downloaded file: {filename} ({content_type}). " + f"File saved at: {file_path}. " + "Note: Install crewai_files for native LLM file handling." + ) + + file = File(source=file_path, content_type=content_type, filename=filename) + + context.files = {filename: file} + + file_size = os.path.getsize(file_path) + size_str = _format_file_size(file_size) + + return f"Downloaded file: {filename} ({content_type}, {size_str}). File is attached for LLM analysis." + + except Exception as e: + logger.exception(f"Error processing file marker: {e}") + return f"Error processing downloaded file: {e}" + + +def _format_file_size(size_bytes: int) -> str: + """Format file size in human-readable format. + + Args: + size_bytes: Size in bytes. + + Returns: + Human-readable size string. + """ + if size_bytes < 1024: + return f"{size_bytes} bytes" + elif size_bytes < 1024 * 1024: + return f"{size_bytes / 1024:.1f} KB" + elif size_bytes < 1024 * 1024 * 1024: + return f"{size_bytes / (1024 * 1024):.1f} MB" + else: + return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" + + +def register_file_processing_hook() -> bool: + """Register the file processing hook globally. + + This function should be called once during application initialization + to enable automatic file injection for platform tools. + + Returns: + True if the hook was registered, False if it was already registered + or if registration failed. + """ + global _hook_registered + + if _hook_registered: + logger.debug("File processing hook already registered") + return False + + try: + from crewai.hooks import register_after_tool_call_hook + + register_after_tool_call_hook(process_file_markers) + _hook_registered = True + logger.info("File processing hook registered successfully") + return True + except ImportError: + logger.warning( + "crewai.hooks not available. File processing hook not registered." + ) + return False + except Exception as e: + logger.exception(f"Failed to register file processing hook: {e}") + return False diff --git a/lib/crewai-tools/tests/tools/crewai_platform_tools/test_crewai_platform_tools.py b/lib/crewai-tools/tests/tools/crewai_platform_tools/test_crewai_platform_tools.py index b69b073ed..edbb3cddf 100644 --- a/lib/crewai-tools/tests/tools/crewai_platform_tools/test_crewai_platform_tools.py +++ b/lib/crewai-tools/tests/tools/crewai_platform_tools/test_crewai_platform_tools.py @@ -2,6 +2,7 @@ import unittest from unittest.mock import Mock, patch from crewai_tools.tools.crewai_platform_tools import CrewaiPlatformTools +from crewai_tools.tools.crewai_platform_tools import file_hook class TestCrewaiPlatformTools(unittest.TestCase): @@ -113,3 +114,64 @@ class TestCrewaiPlatformTools(unittest.TestCase): with self.assertRaises(ValueError) as context: CrewaiPlatformTools(apps=["github"]) assert "No platform integration token found" in str(context.exception) + + @patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token"}) + @patch( + "crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder.requests.get" + ) + @patch( + "crewai_tools.tools.crewai_platform_tools.crewai_platform_tools.register_file_processing_hook" + ) + def test_crewai_platform_tools_registers_file_hook( + self, mock_register_hook, mock_get + ): + mock_response = Mock() + mock_response.raise_for_status.return_value = None + mock_response.json.return_value = {"actions": {"github": []}} + mock_get.return_value = mock_response + + CrewaiPlatformTools(apps=["github"]) + mock_register_hook.assert_called_once() + + +class TestFileHook(unittest.TestCase): + def setUp(self): + file_hook._hook_registered = False + + def tearDown(self): + file_hook._hook_registered = False + + @patch("crewai.hooks.register_after_tool_call_hook") + def test_register_hook_is_idempotent(self, mock_register): + """Test hook registration succeeds once and is idempotent.""" + assert file_hook.register_file_processing_hook() is True + assert file_hook._hook_registered is True + mock_register.assert_called_once_with(file_hook.process_file_markers) + + # Second call should return False and not register again + assert file_hook.register_file_processing_hook() is False + mock_register.assert_called_once() + + def test_process_file_markers_ignores_non_file_results(self): + """Test that non-file-marker results return None.""" + test_cases = [ + None, # Empty result + "Regular tool output", # Non-marker + "__CREWAI_FILE__:incomplete", # Invalid format (missing parts) + ] + for tool_result in test_cases: + mock_context = Mock() + mock_context.tool_result = tool_result + assert file_hook.process_file_markers(mock_context) is None + + def test_format_file_size(self): + """Test file size formatting across units.""" + cases = [ + (500, "500 bytes"), + (1024, "1.0 KB"), + (1536, "1.5 KB"), + (1024 * 1024, "1.0 MB"), + (1024 * 1024 * 1024, "1.0 GB"), + ] + for size_bytes, expected in cases: + assert file_hook._format_file_size(size_bytes) == expected diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index 1218ceae8..a5867ae57 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -930,6 +930,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): "name": func_name, "content": result, } + + if after_hook_context.files: + tool_message["files"] = after_hook_context.files + self.messages.append(tool_message) # Log the tool execution diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index b9d8adccc..48769a208 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -814,6 +814,10 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): "name": func_name, "content": result, } + + if after_hook_context.files: + tool_message["files"] = after_hook_context.files + self.state.messages.append(tool_message) # Log the tool execution diff --git a/lib/crewai/src/crewai/hooks/tool_hooks.py b/lib/crewai/src/crewai/hooks/tool_hooks.py index 6ee0ab033..74b57ceaa 100644 --- a/lib/crewai/src/crewai/hooks/tool_hooks.py +++ b/lib/crewai/src/crewai/hooks/tool_hooks.py @@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, Any from crewai.events.event_listener import event_listener from crewai.hooks.types import AfterToolCallHookType, BeforeToolCallHookType from crewai.utilities.printer import Printer +from crewai.utilities.types import FileInput if TYPE_CHECKING: @@ -34,6 +35,9 @@ class ToolCallHookContext: crew: Crew instance (may be None) tool_result: Tool execution result (only set for after_tool_call hooks). Can be modified by returning a new string from after_tool_call hook. + files: Optional dictionary of files to attach to the tool message. + Can be set by after_tool_call hooks to inject files into the LLM context. + These files will be formatted according to the LLM provider's requirements. """ def __init__( @@ -64,6 +68,7 @@ class ToolCallHookContext: self.task = task self.crew = crew self.tool_result = tool_result + self.files: dict[str, FileInput] | None = None def request_human_input( self,