From 3ad0af49347145d978feaff849677d6ebf58d935 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 21 Jan 2026 18:41:34 -0500 Subject: [PATCH] feat: add file resolver for inline vs upload decisions --- .../src/crewai/utilities/files/cleanup.py | 186 +++++++++++ .../src/crewai/utilities/files/resolver.py | 297 ++++++++++++++++++ 2 files changed, 483 insertions(+) create mode 100644 lib/crewai/src/crewai/utilities/files/cleanup.py create mode 100644 lib/crewai/src/crewai/utilities/files/resolver.py diff --git a/lib/crewai/src/crewai/utilities/files/cleanup.py b/lib/crewai/src/crewai/utilities/files/cleanup.py new file mode 100644 index 000000000..b86737456 --- /dev/null +++ b/lib/crewai/src/crewai/utilities/files/cleanup.py @@ -0,0 +1,186 @@ +"""Cleanup utilities for uploaded files.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from crewai.utilities.files.upload_cache import CachedUpload, UploadCache +from crewai.utilities.files.uploaders import get_uploader + + +if TYPE_CHECKING: + from crewai.utilities.files.uploaders.base import FileUploader + +logger = logging.getLogger(__name__) + + +def _safe_delete( + uploader: FileUploader, + file_id: str, + provider: str, +) -> bool: + """Safely delete a file, logging any errors. + + Args: + uploader: The file uploader to use. + file_id: The file ID to delete. + provider: Provider name for logging. + + Returns: + True if deleted successfully, False otherwise. + """ + try: + if uploader.delete(file_id): + logger.debug(f"Deleted {file_id} from {provider}") + return True + logger.warning(f"Failed to delete {file_id} from {provider}") + return False + except Exception as e: + logger.warning(f"Error deleting {file_id} from {provider}: {e}") + return False + + +def cleanup_uploaded_files( + cache: UploadCache, + *, + delete_from_provider: bool = True, + providers: list[str] | None = None, +) -> int: + """Clean up uploaded files from the cache and optionally from providers. + + Args: + cache: The upload cache to clean up. + delete_from_provider: If True, delete files from the provider as well. + providers: Optional list of providers to clean up. If None, cleans all. + + Returns: + Number of files cleaned up. + """ + cleaned = 0 + + provider_uploads: dict[str, list[CachedUpload]] = {} + + for provider in _get_providers_from_cache(cache): + if providers is not None and provider not in providers: + continue + provider_uploads[provider] = cache.get_all_for_provider(provider) + + if delete_from_provider: + for provider, uploads in provider_uploads.items(): + uploader = get_uploader(provider) + if uploader is None: + logger.warning( + f"No uploader available for {provider}, skipping cleanup" + ) + continue + + for upload in uploads: + if _safe_delete(uploader, upload.file_id, provider): + cleaned += 1 + + cache.clear() + + logger.info(f"Cleaned up {cleaned} uploaded files") + return cleaned + + +def cleanup_expired_files( + cache: UploadCache, + *, + delete_from_provider: bool = False, +) -> int: + """Clean up expired files from the cache. + + Args: + cache: The upload cache to clean up. + delete_from_provider: If True, attempt to delete from provider as well. + Note: Expired files may already be deleted by the provider. + + Returns: + Number of expired entries removed from cache. + """ + expired_entries: list[CachedUpload] = [] + + if delete_from_provider: + for provider in _get_providers_from_cache(cache): + expired_entries.extend( + upload + for upload in cache.get_all_for_provider(provider) + if upload.is_expired() + ) + + removed = cache.clear_expired() + + if delete_from_provider: + for upload in expired_entries: + uploader = get_uploader(upload.provider) + if uploader is not None: + try: + uploader.delete(upload.file_id) + except Exception as e: + logger.debug(f"Could not delete expired file {upload.file_id}: {e}") + + return removed + + +def cleanup_provider_files( + provider: str, + *, + cache: UploadCache | None = None, + delete_all_from_provider: bool = False, +) -> int: + """Clean up all files for a specific provider. + + Args: + provider: Provider name to clean up. + cache: Optional upload cache to clear entries from. + delete_all_from_provider: If True, delete all files from the provider, + not just cached ones. + + Returns: + Number of files deleted. + """ + deleted = 0 + uploader = get_uploader(provider) + + if uploader is None: + logger.warning(f"No uploader available for {provider}") + return 0 + + if delete_all_from_provider: + try: + files = uploader.list_files() + for file_info in files: + file_id = file_info.get("id") or file_info.get("name") + if file_id and uploader.delete(file_id): + deleted += 1 + except Exception as e: + logger.warning(f"Error listing/deleting files from {provider}: {e}") + elif cache is not None: + uploads = cache.get_all_for_provider(provider) + for upload in uploads: + if _safe_delete(uploader, upload.file_id, provider): + deleted += 1 + cache.remove_by_file_id(upload.file_id, provider) + + logger.info(f"Deleted {deleted} files from {provider}") + return deleted + + +def _get_providers_from_cache(cache: UploadCache) -> set[str]: + """Get unique provider names from cache entries. + + Args: + cache: The upload cache. + + Returns: + Set of provider names. + """ + providers: set[str] = set() + + with cache._lock: + for _, provider in cache._cache.keys(): + providers.add(provider) + + return providers diff --git a/lib/crewai/src/crewai/utilities/files/resolver.py b/lib/crewai/src/crewai/utilities/files/resolver.py new file mode 100644 index 000000000..d195b53bd --- /dev/null +++ b/lib/crewai/src/crewai/utilities/files/resolver.py @@ -0,0 +1,297 @@ +"""FileResolver for deciding file delivery method and managing uploads.""" + +import base64 +from dataclasses import dataclass, field +import logging + +from crewai.utilities.files.content_types import ( + AudioFile, + ImageFile, + PDFFile, + TextFile, + VideoFile, +) +from crewai.utilities.files.processing.constraints import ( + ProviderConstraints, + get_constraints_for_provider, +) +from crewai.utilities.files.resolved import ( + FileReference, + InlineBase64, + InlineBytes, + ResolvedFile, +) +from crewai.utilities.files.upload_cache import CachedUpload, UploadCache +from crewai.utilities.files.uploaders import get_uploader +from crewai.utilities.files.uploaders.base import FileUploader + + +logger = logging.getLogger(__name__) + +FileInput = AudioFile | ImageFile | PDFFile | TextFile | VideoFile + + +@dataclass +class FileResolverConfig: + """Configuration for FileResolver. + + Attributes: + prefer_upload: If True, prefer uploading over inline for supported providers. + upload_threshold_bytes: Size threshold above which to use upload. + If None, uses provider-specific threshold. + use_bytes_for_bedrock: If True, use raw bytes instead of base64 for Bedrock. + """ + + prefer_upload: bool = False + upload_threshold_bytes: int | None = None + use_bytes_for_bedrock: bool = True + + +@dataclass +class FileResolver: + """Resolves files to their delivery format based on provider capabilities. + + Decides whether to use inline base64, raw bytes, or file upload based on: + - Provider constraints and capabilities + - File size + - Configuration preferences + + Caches uploaded files to avoid redundant uploads. + + Attributes: + config: Resolver configuration. + upload_cache: Cache for tracking uploaded files. + """ + + config: FileResolverConfig = field(default_factory=FileResolverConfig) + upload_cache: UploadCache | None = None + _uploaders: dict[str, FileUploader] = field(default_factory=dict) + + def resolve(self, file: FileInput, provider: str) -> ResolvedFile: + """Resolve a file to its delivery format for a provider. + + Args: + file: The file to resolve. + provider: Provider name (e.g., "gemini", "anthropic", "openai"). + + Returns: + ResolvedFile representing the appropriate delivery format. + """ + provider_lower = provider.lower() + constraints = get_constraints_for_provider(provider) + file_size = len(file.source.read()) + + # Determine if we should use file upload + should_upload = self._should_upload( + file, provider_lower, constraints, file_size + ) + + if should_upload: + resolved = self._resolve_via_upload(file, provider_lower) + if resolved is not None: + return resolved + # Fall back to inline if upload fails + + # Use inline format + return self._resolve_inline(file, provider_lower) + + def resolve_files( + self, + files: dict[str, FileInput], + provider: str, + ) -> dict[str, ResolvedFile]: + """Resolve multiple files for a provider. + + Args: + files: Dictionary mapping names to file inputs. + provider: Provider name. + + Returns: + Dictionary mapping names to resolved files. + """ + return {name: self.resolve(file, provider) for name, file in files.items()} + + def _should_upload( + self, + file: FileInput, + provider: str, + constraints: ProviderConstraints | None, + file_size: int, + ) -> bool: + """Determine if a file should be uploaded rather than inlined. + + Args: + file: The file to check. + provider: Provider name. + constraints: Provider constraints. + file_size: Size of the file in bytes. + + Returns: + True if the file should be uploaded, False otherwise. + """ + # Check if provider supports file upload + if constraints is None or not constraints.supports_file_upload: + return False + + # If prefer_upload is set, always prefer upload + if self.config.prefer_upload: + return True + + # Check against size threshold + threshold = self.config.upload_threshold_bytes + if threshold is None and constraints is not None: + threshold = constraints.file_upload_threshold_bytes + + if threshold is not None and file_size > threshold: + return True + + return False + + def _resolve_via_upload( + self, + file: FileInput, + provider: str, + ) -> ResolvedFile | None: + """Resolve a file by uploading it. + + Args: + file: The file to upload. + provider: Provider name. + + Returns: + FileReference if upload succeeds, None otherwise. + """ + # Check cache first + if self.upload_cache is not None: + cached = self.upload_cache.get(file, provider) + if cached is not None: + logger.debug( + f"Using cached upload for {file.filename}: {cached.file_id}" + ) + return FileReference( + content_type=cached.content_type, + file_id=cached.file_id, + provider=cached.provider, + expires_at=cached.expires_at, + file_uri=cached.file_uri, + ) + + # Get or create uploader + uploader = self._get_uploader(provider) + if uploader is None: + logger.debug(f"No uploader available for {provider}") + return None + + try: + result = uploader.upload(file) + + # Cache the result + if self.upload_cache is not None: + self.upload_cache.set( + file=file, + provider=provider, + file_id=result.file_id, + file_uri=result.file_uri, + expires_at=result.expires_at, + ) + + return FileReference( + content_type=result.content_type, + file_id=result.file_id, + provider=result.provider, + expires_at=result.expires_at, + file_uri=result.file_uri, + ) + + except Exception as e: + logger.warning(f"Failed to upload {file.filename} to {provider}: {e}") + return None + + def _resolve_inline(self, file: FileInput, provider: str) -> ResolvedFile: + """Resolve a file as inline content. + + Args: + file: The file to resolve. + provider: Provider name. + + Returns: + InlineBase64 or InlineBytes depending on provider. + """ + content = file.source.read() + + # Use raw bytes for Bedrock if configured + if self.config.use_bytes_for_bedrock and "bedrock" in provider: + return InlineBytes( + content_type=file.content_type, + data=content, + ) + + # Default to base64 + encoded = base64.b64encode(content).decode("ascii") + return InlineBase64( + content_type=file.content_type, + data=encoded, + ) + + def _get_uploader(self, provider: str) -> FileUploader | None: + """Get or create an uploader for a provider. + + Args: + provider: Provider name. + + Returns: + FileUploader instance or None if not available. + """ + if provider not in self._uploaders: + uploader = get_uploader(provider) + if uploader is not None: + self._uploaders[provider] = uploader + else: + return None + + return self._uploaders.get(provider) + + def get_cached_uploads(self, provider: str) -> list[CachedUpload]: + """Get all cached uploads for a provider. + + Args: + provider: Provider name. + + Returns: + List of cached uploads. + """ + if self.upload_cache is None: + return [] + return self.upload_cache.get_all_for_provider(provider) + + def clear_cache(self) -> None: + """Clear the upload cache.""" + if self.upload_cache is not None: + self.upload_cache.clear() + + +def create_resolver( + provider: str | None = None, + prefer_upload: bool = False, + upload_threshold_bytes: int | None = None, + enable_cache: bool = True, +) -> FileResolver: + """Create a configured FileResolver. + + Args: + provider: Optional provider name for provider-specific configuration. + prefer_upload: Whether to prefer upload over inline. + upload_threshold_bytes: Size threshold for using upload. + enable_cache: Whether to enable upload caching. + + Returns: + Configured FileResolver instance. + """ + config = FileResolverConfig( + prefer_upload=prefer_upload, + upload_threshold_bytes=upload_threshold_bytes, + ) + + cache = UploadCache() if enable_cache else None + + return FileResolver(config=config, upload_cache=cache)