From da930fa1df336fd6de3081d9eecb298289952280 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Thu, 22 Jan 2026 09:52:23 -0500 Subject: [PATCH] refactor: extract helper functions to reduce code duplication --- lib/crewai/src/crewai/files/content_types.py | 14 +- lib/crewai/src/crewai/files/file.py | 4 +- .../src/crewai/files/processing/exceptions.py | 42 ++ .../crewai/files/processing/transformers.py | 4 +- .../src/crewai/files/processing/validators.py | 418 ++++++++++-------- .../src/crewai/files/uploaders/anthropic.py | 76 +--- .../src/crewai/files/uploaders/bedrock.py | 99 ++--- .../src/crewai/files/uploaders/gemini.py | 164 +++---- .../src/crewai/files/uploaders/openai.py | 134 ++---- 9 files changed, 405 insertions(+), 550 deletions(-) diff --git a/lib/crewai/src/crewai/files/content_types.py b/lib/crewai/src/crewai/files/content_types.py index 3aecc6e47..711c9e134 100644 --- a/lib/crewai/src/crewai/files/content_types.py +++ b/lib/crewai/src/crewai/files/content_types.py @@ -5,7 +5,7 @@ from __future__ import annotations from abc import ABC from io import IOBase from pathlib import Path -from typing import Annotated, Any, Literal, Self +from typing import Annotated, Any, BinaryIO, Literal, Self from pydantic import BaseModel, Field, GetCoreSchemaHandler from pydantic_core import CoreSchema, core_schema @@ -37,15 +37,15 @@ class _FileSourceCoercer: return FilePath(path=Path(v)) if isinstance(v, bytes): return FileBytes(data=v) - if isinstance(v, IOBase): + if isinstance(v, (IOBase, BinaryIO)): return FileStream(stream=v) raise ValueError(f"Cannot convert {type(v).__name__} to file source") @classmethod def __get_pydantic_core_schema__( cls, - source_type: Any, - handler: GetCoreSchemaHandler, + _source_type: Any, + _handler: GetCoreSchemaHandler, ) -> CoreSchema: """Generate Pydantic core schema for FileSource coercion.""" return core_schema.no_info_plain_validator_function( @@ -261,7 +261,7 @@ class File(BaseFile): The content type is automatically detected from the file contents. Example: - >>> file = File(source="./document.pdf") - >>> file = File(source="./image.png") - >>> file = File(source=some_bytes) + >>> pdf_file = File(source="./document.pdf") + >>> image_file = File(source="./image.png") + >>> bytes_file = File(source=b"file content") """ diff --git a/lib/crewai/src/crewai/files/file.py b/lib/crewai/src/crewai/files/file.py index 19ab5a982..1bd370671 100644 --- a/lib/crewai/src/crewai/files/file.py +++ b/lib/crewai/src/crewai/files/file.py @@ -23,7 +23,9 @@ from pydantic_core import CoreSchema, core_schema class AsyncReadable(Protocol): """Protocol for async readable streams.""" - async def read(self, size: int = -1) -> bytes: ... + async def read(self, size: int = -1) -> bytes: + """Read up to size bytes from the stream.""" + ... class _AsyncReadableValidator: diff --git a/lib/crewai/src/crewai/files/processing/exceptions.py b/lib/crewai/src/crewai/files/processing/exceptions.py index 0a9442462..6d49dbde0 100644 --- a/lib/crewai/src/crewai/files/processing/exceptions.py +++ b/lib/crewai/src/crewai/files/processing/exceptions.py @@ -101,3 +101,45 @@ class TransientUploadError(UploadError, TransientFileError): class PermanentUploadError(UploadError, PermanentFileError): """Upload failed permanently (auth failure, invalid file, unsupported type).""" + + +def classify_upload_error(e: Exception, filename: str | None = None) -> Exception: + """Classify an exception as transient or permanent upload error. + + Analyzes the exception type name and status code to determine if + the error is likely transient (retryable) or permanent. + + Args: + e: The exception to classify. + filename: Optional filename for error context. + + Returns: + A TransientUploadError or PermanentUploadError wrapping the original. + """ + error_type = type(e).__name__ + + if "RateLimit" in error_type or "APIConnection" in error_type: + return TransientUploadError(f"Transient upload error: {e}", file_name=filename) + if "Authentication" in error_type or "Permission" in error_type: + return PermanentUploadError( + f"Authentication/permission error: {e}", file_name=filename + ) + if "BadRequest" in error_type or "InvalidRequest" in error_type: + return PermanentUploadError(f"Invalid request: {e}", file_name=filename) + + status_code = getattr(e, "status_code", None) + if status_code is not None: + if status_code >= 500 or status_code == 429: + return TransientUploadError( + f"Server error ({status_code}): {e}", file_name=filename + ) + if status_code in (401, 403): + return PermanentUploadError( + f"Auth error ({status_code}): {e}", file_name=filename + ) + if status_code == 400: + return PermanentUploadError( + f"Bad request ({status_code}): {e}", file_name=filename + ) + + return TransientUploadError(f"Upload failed: {e}", file_name=filename) diff --git a/lib/crewai/src/crewai/files/processing/transformers.py b/lib/crewai/src/crewai/files/processing/transformers.py index d7ce5f952..cae2384fb 100644 --- a/lib/crewai/src/crewai/files/processing/transformers.py +++ b/lib/crewai/src/crewai/files/processing/transformers.py @@ -245,7 +245,7 @@ def chunk_text( TextFile objects, one per chunk. """ content = file.read() - text = content.decode("utf-8", errors="replace") + text = content.decode(errors="replace") total_chars = len(text) if total_chars <= max_chars: @@ -268,7 +268,7 @@ def chunk_text( end_pos = last_newline + 1 chunk_content = text[start_pos:end_pos] - chunk_bytes = chunk_content.encode("utf-8") + chunk_bytes = chunk_content.encode() chunk_filename = f"{base_filename}_chunk_{chunk_num}.{extension}" diff --git a/lib/crewai/src/crewai/files/processing/validators.py b/lib/crewai/src/crewai/files/processing/validators.py index 74af56dd5..d764e05d0 100644 --- a/lib/crewai/src/crewai/files/processing/validators.py +++ b/lib/crewai/src/crewai/files/processing/validators.py @@ -1,6 +1,7 @@ """File validation functions for checking against provider constraints.""" from collections.abc import Sequence +import io import logging from crewai.files.content_types import ( @@ -30,6 +31,51 @@ logger = logging.getLogger(__name__) FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile +def _get_image_dimensions(content: bytes) -> tuple[int, int] | None: + """Get image dimensions using Pillow if available. + + Args: + content: Raw image bytes. + + Returns: + Tuple of (width, height) or None if Pillow unavailable. + """ + try: + from PIL import Image + + with Image.open(io.BytesIO(content)) as img: + width, height = img.size + return (int(width), int(height)) + except ImportError: + logger.warning( + "Pillow not installed - cannot validate image dimensions. " + "Install with: pip install Pillow" + ) + return None + + +def _get_pdf_page_count(content: bytes) -> int | None: + """Get PDF page count using pypdf if available. + + Args: + content: Raw PDF bytes. + + Returns: + Page count or None if pypdf unavailable. + """ + try: + from pypdf import PdfReader + + reader = PdfReader(io.BytesIO(content)) + return len(reader.pages) + except ImportError: + logger.warning( + "pypdf not installed - cannot validate PDF page count. " + "Install with: pip install pypdf" + ) + return None + + def _format_size(size_bytes: int) -> str: """Format byte size to human-readable string.""" if size_bytes >= 1024 * 1024 * 1024: @@ -41,6 +87,69 @@ def _format_size(size_bytes: int) -> str: return f"{size_bytes}B" +def _validate_size( + file_type: str, + filename: str | None, + file_size: int, + max_size: int, + errors: list[str], + raise_on_error: bool, +) -> None: + """Validate file size against maximum. + + Args: + file_type: Type label for error messages (e.g., "Image", "PDF"). + filename: Name of the file being validated. + file_size: Actual file size in bytes. + max_size: Maximum allowed size in bytes. + errors: List to append error messages to. + raise_on_error: If True, raise FileTooLargeError on failure. + """ + if file_size > max_size: + msg = ( + f"{file_type} '{filename}' size ({_format_size(file_size)}) exceeds " + f"maximum ({_format_size(max_size)})" + ) + errors.append(msg) + if raise_on_error: + raise FileTooLargeError( + msg, + file_name=filename, + actual_size=file_size, + max_size=max_size, + ) + + +def _validate_format( + file_type: str, + filename: str | None, + content_type: str, + supported_formats: tuple[str, ...], + errors: list[str], + raise_on_error: bool, +) -> None: + """Validate content type against supported formats. + + Args: + file_type: Type label for error messages (e.g., "Image", "Audio"). + filename: Name of the file being validated. + content_type: MIME type of the file. + supported_formats: Tuple of supported MIME types. + errors: List to append error messages to. + raise_on_error: If True, raise UnsupportedFileTypeError on failure. + """ + if content_type not in supported_formats: + msg = ( + f"{file_type} format '{content_type}' is not supported. " + f"Supported: {', '.join(supported_formats)}" + ) + errors.append(msg) + if raise_on_error: + raise UnsupportedFileTypeError( + msg, file_name=filename, content_type=content_type + ) + + def validate_image( file: ImageFile, constraints: ImageConstraints, @@ -67,64 +176,40 @@ def validate_image( file_size = len(content) filename = file.filename - if file_size > constraints.max_size_bytes: - msg = ( - f"Image '{filename}' size ({_format_size(file_size)}) exceeds " - f"maximum ({_format_size(constraints.max_size_bytes)})" - ) - errors.append(msg) - if raise_on_error: - raise FileTooLargeError( - msg, - file_name=filename, - actual_size=file_size, - max_size=constraints.max_size_bytes, - ) - - content_type = file.content_type - if content_type not in constraints.supported_formats: - msg = ( - f"Image format '{content_type}' is not supported. " - f"Supported: {', '.join(constraints.supported_formats)}" - ) - errors.append(msg) - if raise_on_error: - raise UnsupportedFileTypeError( - msg, file_name=filename, content_type=content_type - ) + _validate_size( + "Image", filename, file_size, constraints.max_size_bytes, errors, raise_on_error + ) + _validate_format( + "Image", + filename, + file.content_type, + constraints.supported_formats, + errors, + raise_on_error, + ) if constraints.max_width is not None or constraints.max_height is not None: - try: - import io + dimensions = _get_image_dimensions(content) + if dimensions is not None: + width, height = dimensions - from PIL import Image + if constraints.max_width and width > constraints.max_width: + msg = ( + f"Image '{filename}' width ({width}px) exceeds " + f"maximum ({constraints.max_width}px)" + ) + errors.append(msg) + if raise_on_error: + raise FileValidationError(msg, file_name=filename) - with Image.open(io.BytesIO(content)) as img: - width, height = img.size - - if constraints.max_width and width > constraints.max_width: - msg = ( - f"Image '{filename}' width ({width}px) exceeds " - f"maximum ({constraints.max_width}px)" - ) - errors.append(msg) - if raise_on_error: - raise FileValidationError(msg, file_name=filename) - - if constraints.max_height and height > constraints.max_height: - msg = ( - f"Image '{filename}' height ({height}px) exceeds " - f"maximum ({constraints.max_height}px)" - ) - errors.append(msg) - if raise_on_error: - raise FileValidationError(msg, file_name=filename) - - except ImportError: - logger.warning( - "Pillow not installed - cannot validate image dimensions. " - "Install with: pip install Pillow" - ) + if constraints.max_height and height > constraints.max_height: + msg = ( + f"Image '{filename}' height ({height}px) exceeds " + f"maximum ({constraints.max_height}px)" + ) + errors.append(msg) + if raise_on_error: + raise FileValidationError(msg, file_name=filename) return errors @@ -154,43 +239,20 @@ def validate_pdf( file_size = len(content) filename = file.filename - if file_size > constraints.max_size_bytes: - msg = ( - f"PDF '{filename}' size ({_format_size(file_size)}) exceeds " - f"maximum ({_format_size(constraints.max_size_bytes)})" - ) - errors.append(msg) - if raise_on_error: - raise FileTooLargeError( - msg, - file_name=filename, - actual_size=file_size, - max_size=constraints.max_size_bytes, - ) + _validate_size( + "PDF", filename, file_size, constraints.max_size_bytes, errors, raise_on_error + ) if constraints.max_pages is not None: - try: - import io - - from pypdf import PdfReader - - reader = PdfReader(io.BytesIO(content)) - page_count = len(reader.pages) - - if page_count > constraints.max_pages: - msg = ( - f"PDF '{filename}' page count ({page_count}) exceeds " - f"maximum ({constraints.max_pages})" - ) - errors.append(msg) - if raise_on_error: - raise FileValidationError(msg, file_name=filename) - - except ImportError: - logger.warning( - "pypdf not installed - cannot validate PDF page count. " - "Install with: pip install pypdf" + page_count = _get_pdf_page_count(content) + if page_count is not None and page_count > constraints.max_pages: + msg = ( + f"PDF '{filename}' page count ({page_count}) exceeds " + f"maximum ({constraints.max_pages})" ) + errors.append(msg) + if raise_on_error: + raise FileValidationError(msg, file_name=filename) return errors @@ -216,35 +278,24 @@ def validate_audio( UnsupportedFileTypeError: If the format is not supported. """ errors: list[str] = [] - content = file.read() - file_size = len(content) - filename = file.filename + file_size = len(file.read()) - if file_size > constraints.max_size_bytes: - msg = ( - f"Audio '{filename}' size ({_format_size(file_size)}) exceeds " - f"maximum ({_format_size(constraints.max_size_bytes)})" - ) - errors.append(msg) - if raise_on_error: - raise FileTooLargeError( - msg, - file_name=filename, - actual_size=file_size, - max_size=constraints.max_size_bytes, - ) - - content_type = file.content_type - if content_type not in constraints.supported_formats: - msg = ( - f"Audio format '{content_type}' is not supported. " - f"Supported: {', '.join(constraints.supported_formats)}" - ) - errors.append(msg) - if raise_on_error: - raise UnsupportedFileTypeError( - msg, file_name=filename, content_type=content_type - ) + _validate_size( + "Audio", + file.filename, + file_size, + constraints.max_size_bytes, + errors, + raise_on_error, + ) + _validate_format( + "Audio", + file.filename, + file.content_type, + constraints.supported_formats, + errors, + raise_on_error, + ) return errors @@ -270,35 +321,24 @@ def validate_video( UnsupportedFileTypeError: If the format is not supported. """ errors: list[str] = [] - content = file.read() - file_size = len(content) - filename = file.filename + file_size = len(file.read()) - if file_size > constraints.max_size_bytes: - msg = ( - f"Video '{filename}' size ({_format_size(file_size)}) exceeds " - f"maximum ({_format_size(constraints.max_size_bytes)})" - ) - errors.append(msg) - if raise_on_error: - raise FileTooLargeError( - msg, - file_name=filename, - actual_size=file_size, - max_size=constraints.max_size_bytes, - ) - - content_type = file.content_type - if content_type not in constraints.supported_formats: - msg = ( - f"Video format '{content_type}' is not supported. " - f"Supported: {', '.join(constraints.supported_formats)}" - ) - errors.append(msg) - if raise_on_error: - raise UnsupportedFileTypeError( - msg, file_name=filename, content_type=content_type - ) + _validate_size( + "Video", + file.filename, + file_size, + constraints.max_size_bytes, + errors, + raise_on_error, + ) + _validate_format( + "Video", + file.filename, + file.content_type, + constraints.supported_formats, + errors, + raise_on_error, + ) return errors @@ -327,27 +367,47 @@ def validate_text( if constraints.general_max_size_bytes is None: return errors - content = file.read() - file_size = len(content) - filename = file.filename - - if file_size > constraints.general_max_size_bytes: - msg = ( - f"Text file '{filename}' size ({_format_size(file_size)}) exceeds " - f"maximum ({_format_size(constraints.general_max_size_bytes)})" - ) - errors.append(msg) - if raise_on_error: - raise FileTooLargeError( - msg, - file_name=filename, - actual_size=file_size, - max_size=constraints.general_max_size_bytes, - ) + file_size = len(file.read()) + _validate_size( + "Text file", + file.filename, + file_size, + constraints.general_max_size_bytes, + errors, + raise_on_error, + ) return errors +def _check_unsupported_type( + file: FileInput, + provider_name: str, + type_name: str, + raise_on_error: bool, +) -> Sequence[str]: + """Check if file type is unsupported and handle error. + + Args: + file: The file being validated. + provider_name: Name of the provider. + type_name: Name of the file type (e.g., "images", "PDFs"). + raise_on_error: If True, raise exception instead of returning errors. + + Returns: + List with error message (only returns when raise_on_error is False). + + Raises: + UnsupportedFileTypeError: If raise_on_error is True. + """ + msg = f"Provider '{provider_name}' does not support {type_name}" + if raise_on_error: + raise UnsupportedFileTypeError( + msg, file_name=file.filename, content_type=file.content_type + ) + return [msg] + + def validate_file( file: FileInput, constraints: ProviderConstraints, @@ -373,42 +433,30 @@ def validate_file( """ if isinstance(file, ImageFile): if constraints.image is None: - msg = f"Provider '{constraints.name}' does not support images" - if raise_on_error: - raise UnsupportedFileTypeError( - msg, file_name=file.filename, content_type=file.content_type - ) - return [msg] + return _check_unsupported_type( + file, constraints.name, "images", raise_on_error + ) return validate_image(file, constraints.image, raise_on_error=raise_on_error) if isinstance(file, PDFFile): if constraints.pdf is None: - msg = f"Provider '{constraints.name}' does not support PDFs" - if raise_on_error: - raise UnsupportedFileTypeError( - msg, file_name=file.filename, content_type=file.content_type - ) - return [msg] + return _check_unsupported_type( + file, constraints.name, "PDFs", raise_on_error + ) return validate_pdf(file, constraints.pdf, raise_on_error=raise_on_error) if isinstance(file, AudioFile): if constraints.audio is None: - msg = f"Provider '{constraints.name}' does not support audio" - if raise_on_error: - raise UnsupportedFileTypeError( - msg, file_name=file.filename, content_type=file.content_type - ) - return [msg] + return _check_unsupported_type( + file, constraints.name, "audio", raise_on_error + ) return validate_audio(file, constraints.audio, raise_on_error=raise_on_error) if isinstance(file, VideoFile): if constraints.video is None: - msg = f"Provider '{constraints.name}' does not support video" - if raise_on_error: - raise UnsupportedFileTypeError( - msg, file_name=file.filename, content_type=file.content_type - ) - return [msg] + return _check_unsupported_type( + file, constraints.name, "video", raise_on_error + ) return validate_video(file, constraints.video, raise_on_error=raise_on_error) if isinstance(file, TextFile): diff --git a/lib/crewai/src/crewai/files/uploaders/anthropic.py b/lib/crewai/src/crewai/files/uploaders/anthropic.py index 752ee790b..df61daf3f 100644 --- a/lib/crewai/src/crewai/files/uploaders/anthropic.py +++ b/lib/crewai/src/crewai/files/uploaders/anthropic.py @@ -15,6 +15,7 @@ from crewai.files.content_types import ( TextFile, VideoFile, ) +from crewai.files.processing.exceptions import classify_upload_error from crewai.files.uploaders.base import FileUploader, UploadResult @@ -28,9 +29,6 @@ class AnthropicFileUploader(FileUploader): Uses the anthropic SDK to upload files. Files are stored persistently until explicitly deleted. - - Attributes: - api_key: Optional API key (uses ANTHROPIC_API_KEY env var if not provided). """ def __init__(self, api_key: str | None = None) -> None: @@ -91,11 +89,6 @@ class AnthropicFileUploader(FileUploader): TransientUploadError: For retryable errors (network, rate limits). PermanentUploadError: For non-retryable errors (auth, validation). """ - from crewai.files.processing.exceptions import ( - PermanentUploadError, - TransientUploadError, - ) - try: client = self._get_client() @@ -125,36 +118,7 @@ class AnthropicFileUploader(FileUploader): except ImportError: raise except Exception as e: - error_type = type(e).__name__ - if "RateLimit" in error_type or "APIConnection" in error_type: - raise TransientUploadError( - f"Transient upload error: {e}", file_name=file.filename - ) from e - if "Authentication" in error_type or "Permission" in error_type: - raise PermanentUploadError( - f"Authentication/permission error: {e}", file_name=file.filename - ) from e - if "BadRequest" in error_type or "InvalidRequest" in error_type: - raise PermanentUploadError( - f"Invalid request: {e}", file_name=file.filename - ) from e - status_code = getattr(e, "status_code", None) - if status_code is not None: - if status_code >= 500 or status_code == 429: - raise TransientUploadError( - f"Server error ({status_code}): {e}", file_name=file.filename - ) from e - if status_code in (401, 403): - raise PermanentUploadError( - f"Auth error ({status_code}): {e}", file_name=file.filename - ) from e - if status_code == 400: - raise PermanentUploadError( - f"Bad request ({status_code}): {e}", file_name=file.filename - ) from e - raise TransientUploadError( - f"Upload failed: {e}", file_name=file.filename - ) from e + raise classify_upload_error(e, file.filename) from e def delete(self, file_id: str) -> bool: """Delete an uploaded file from Anthropic. @@ -236,11 +200,6 @@ class AnthropicFileUploader(FileUploader): TransientUploadError: For retryable errors (network, rate limits). PermanentUploadError: For non-retryable errors (auth, validation). """ - from crewai.files.processing.exceptions import ( - PermanentUploadError, - TransientUploadError, - ) - try: client = self._get_async_client() @@ -270,36 +229,7 @@ class AnthropicFileUploader(FileUploader): except ImportError: raise except Exception as e: - error_type = type(e).__name__ - if "RateLimit" in error_type or "APIConnection" in error_type: - raise TransientUploadError( - f"Transient upload error: {e}", file_name=file.filename - ) from e - if "Authentication" in error_type or "Permission" in error_type: - raise PermanentUploadError( - f"Authentication/permission error: {e}", file_name=file.filename - ) from e - if "BadRequest" in error_type or "InvalidRequest" in error_type: - raise PermanentUploadError( - f"Invalid request: {e}", file_name=file.filename - ) from e - status_code = getattr(e, "status_code", None) - if status_code is not None: - if status_code >= 500 or status_code == 429: - raise TransientUploadError( - f"Server error ({status_code}): {e}", file_name=file.filename - ) from e - if status_code in (401, 403): - raise PermanentUploadError( - f"Auth error ({status_code}): {e}", file_name=file.filename - ) from e - if status_code == 400: - raise PermanentUploadError( - f"Bad request ({status_code}): {e}", file_name=file.filename - ) from e - raise TransientUploadError( - f"Upload failed: {e}", file_name=file.filename - ) from e + raise classify_upload_error(e, file.filename) from e async def adelete(self, file_id: str) -> bool: """Async delete an uploaded file from Anthropic. diff --git a/lib/crewai/src/crewai/files/uploaders/bedrock.py b/lib/crewai/src/crewai/files/uploaders/bedrock.py index fb1440d95..20ae7313a 100644 --- a/lib/crewai/src/crewai/files/uploaders/bedrock.py +++ b/lib/crewai/src/crewai/files/uploaders/bedrock.py @@ -17,6 +17,10 @@ from crewai.files.content_types import ( VideoFile, ) from crewai.files.file import FileBytes, FilePath +from crewai.files.processing.exceptions import ( + PermanentUploadError, + TransientUploadError, +) from crewai.files.uploaders.base import FileUploader, UploadResult @@ -29,6 +33,30 @@ MULTIPART_CHUNKSIZE = 8 * 1024 * 1024 MAX_CONCURRENCY = 10 +def _classify_s3_error(e: Exception, filename: str | None) -> Exception: + """Classify an S3 exception as transient or permanent upload error. + + Args: + e: The exception to classify. + filename: The filename for error context. + + Returns: + A TransientUploadError or PermanentUploadError wrapping the original. + """ + error_type = type(e).__name__ + error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "") + + if error_code in ("SlowDown", "ServiceUnavailable", "InternalError"): + return TransientUploadError(f"Transient S3 error: {e}", file_name=filename) + if error_code in ("AccessDenied", "InvalidAccessKeyId", "SignatureDoesNotMatch"): + return PermanentUploadError(f"S3 authentication error: {e}", file_name=filename) + if error_code in ("NoSuchBucket", "InvalidBucketName"): + return PermanentUploadError(f"S3 bucket error: {e}", file_name=filename) + if "Throttl" in error_type or "Throttl" in str(e): + return TransientUploadError(f"S3 throttling: {e}", file_name=filename) + return TransientUploadError(f"S3 upload failed: {e}", file_name=filename) + + def _get_file_path(file: FileInput) -> Path | None: """Get the filesystem path if file source is FilePath. @@ -82,12 +110,6 @@ class BedrockFileUploader(FileUploader): Uploads files to S3 and returns S3 URIs that can be used with Bedrock's Converse API s3Location source format. - - Attributes: - bucket_name: S3 bucket name for file uploads. - bucket_owner: Optional bucket owner account ID for cross-account access. - prefix: Optional S3 key prefix for uploaded files. - region: AWS region for the S3 bucket. """ def __init__( @@ -213,7 +235,6 @@ class BedrockFileUploader(FileUploader): multipart_threshold=MULTIPART_THRESHOLD, multipart_chunksize=MULTIPART_CHUNKSIZE, max_concurrency=MAX_CONCURRENCY, - use_threads=True, ) def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult: @@ -235,11 +256,6 @@ class BedrockFileUploader(FileUploader): """ import io - from crewai.files.processing.exceptions import ( - PermanentUploadError, - TransientUploadError, - ) - try: client = self._get_client() transfer_config = self._get_transfer_config() @@ -292,32 +308,7 @@ class BedrockFileUploader(FileUploader): except ImportError: raise except Exception as e: - error_type = type(e).__name__ - error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "") - - if error_code in ("SlowDown", "ServiceUnavailable", "InternalError"): - raise TransientUploadError( - f"Transient S3 error: {e}", file_name=file.filename - ) from e - if error_code in ( - "AccessDenied", - "InvalidAccessKeyId", - "SignatureDoesNotMatch", - ): - raise PermanentUploadError( - f"S3 authentication error: {e}", file_name=file.filename - ) from e - if error_code in ("NoSuchBucket", "InvalidBucketName"): - raise PermanentUploadError( - f"S3 bucket error: {e}", file_name=file.filename - ) from e - if "Throttl" in error_type or "Throttl" in str(e): - raise TransientUploadError( - f"S3 throttling: {e}", file_name=file.filename - ) from e - raise TransientUploadError( - f"S3 upload failed: {e}", file_name=file.filename - ) from e + raise _classify_s3_error(e, file.filename) from e def delete(self, file_id: str) -> bool: """Delete an uploaded file from S3. @@ -412,11 +403,6 @@ class BedrockFileUploader(FileUploader): import aiofiles - from crewai.files.processing.exceptions import ( - PermanentUploadError, - TransientUploadError, - ) - try: session = self._get_async_client() transfer_config = self._get_transfer_config() @@ -471,32 +457,7 @@ class BedrockFileUploader(FileUploader): except ImportError: raise except Exception as e: - error_type = type(e).__name__ - error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "") - - if error_code in ("SlowDown", "ServiceUnavailable", "InternalError"): - raise TransientUploadError( - f"Transient S3 error: {e}", file_name=file.filename - ) from e - if error_code in ( - "AccessDenied", - "InvalidAccessKeyId", - "SignatureDoesNotMatch", - ): - raise PermanentUploadError( - f"S3 authentication error: {e}", file_name=file.filename - ) from e - if error_code in ("NoSuchBucket", "InvalidBucketName"): - raise PermanentUploadError( - f"S3 bucket error: {e}", file_name=file.filename - ) from e - if "Throttl" in error_type or "Throttl" in str(e): - raise TransientUploadError( - f"S3 throttling: {e}", file_name=file.filename - ) from e - raise TransientUploadError( - f"S3 upload failed: {e}", file_name=file.filename - ) from e + raise _classify_s3_error(e, file.filename) from e async def adelete(self, file_id: str) -> bool: """Async delete an uploaded file from S3. diff --git a/lib/crewai/src/crewai/files/uploaders/gemini.py b/lib/crewai/src/crewai/files/uploaders/gemini.py index e9fb3fa9c..73b0087a0 100644 --- a/lib/crewai/src/crewai/files/uploaders/gemini.py +++ b/lib/crewai/src/crewai/files/uploaders/gemini.py @@ -21,6 +21,11 @@ from crewai.files.content_types import ( VideoFile, ) from crewai.files.file import FilePath +from crewai.files.processing.exceptions import ( + PermanentUploadError, + TransientUploadError, + classify_upload_error, +) from crewai.files.uploaders.base import FileUploader, UploadResult @@ -30,6 +35,51 @@ FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile GEMINI_FILE_TTL = timedelta(hours=48) +BACKOFF_BASE_DELAY = 1.0 +BACKOFF_MAX_DELAY = 30.0 +BACKOFF_JITTER_FACTOR = 0.1 + + +def _compute_backoff_delay(attempt: int) -> float: + """Compute exponential backoff delay with jitter. + + Args: + attempt: The current attempt number (0-indexed). + + Returns: + Delay in seconds with jitter applied. + """ + delay: float = min(BACKOFF_BASE_DELAY * (2**attempt), BACKOFF_MAX_DELAY) + jitter: float = random.uniform(0, delay * BACKOFF_JITTER_FACTOR) # noqa: S311 + return float(delay + jitter) + + +def _classify_gemini_error(e: Exception, filename: str | None) -> Exception: + """Classify a Gemini exception as transient or permanent upload error. + + Checks Gemini-specific error message patterns first, then falls back + to generic status code classification. + + Args: + e: The exception to classify. + filename: The filename for error context. + + Returns: + A TransientUploadError or PermanentUploadError wrapping the original. + """ + error_msg = str(e).lower() + + if "quota" in error_msg or "rate" in error_msg or "limit" in error_msg: + return TransientUploadError(f"Rate limit error: {e}", file_name=filename) + if "auth" in error_msg or "permission" in error_msg or "denied" in error_msg: + return PermanentUploadError( + f"Authentication/permission error: {e}", file_name=filename + ) + if "invalid" in error_msg or "unsupported" in error_msg: + return PermanentUploadError(f"Invalid request: {e}", file_name=filename) + + return classify_upload_error(e, filename) + def _get_file_path(file: FileInput) -> Path | None: """Get the filesystem path if file source is FilePath. @@ -46,28 +96,10 @@ def _get_file_path(file: FileInput) -> Path | None: return None -def _get_file_size(file: FileInput) -> int | None: - """Get file size without reading content if possible. - - Args: - file: The file input. - - Returns: - Size in bytes if determinable without reading, None otherwise. - """ - source = file._file_source - if isinstance(source, FilePath): - return source.path.stat().st_size - return None - - class GeminiFileUploader(FileUploader): """Uploader for Google Gemini File API. Uses the google-genai SDK to upload files. Files are stored for 48 hours. - - Attributes: - api_key: Optional API key (uses GOOGLE_API_KEY env var if not provided). """ def __init__(self, api_key: str | None = None) -> None: @@ -116,11 +148,6 @@ class GeminiFileUploader(FileUploader): TransientUploadError: For retryable errors (network, rate limits). PermanentUploadError: For non-retryable errors (auth, validation). """ - from crewai.files.processing.exceptions import ( - PermanentUploadError, - TransientUploadError, - ) - try: client = self._get_client() display_name = purpose or file.filename @@ -181,42 +208,7 @@ class GeminiFileUploader(FileUploader): except (TransientUploadError, PermanentUploadError): raise except Exception as e: - error_msg = str(e).lower() - if "quota" in error_msg or "rate" in error_msg or "limit" in error_msg: - raise TransientUploadError( - f"Rate limit error: {e}", file_name=file.filename - ) from e - if ( - "auth" in error_msg - or "permission" in error_msg - or "denied" in error_msg - ): - raise PermanentUploadError( - f"Authentication/permission error: {e}", file_name=file.filename - ) from e - if "invalid" in error_msg or "unsupported" in error_msg: - raise PermanentUploadError( - f"Invalid request: {e}", file_name=file.filename - ) from e - status_code = getattr(e, "code", None) or getattr(e, "status_code", None) - if status_code is not None: - if isinstance(status_code, int): - if status_code >= 500 or status_code == 429: - raise TransientUploadError( - f"Server error ({status_code}): {e}", - file_name=file.filename, - ) from e - if status_code in (401, 403): - raise PermanentUploadError( - f"Auth error ({status_code}): {e}", file_name=file.filename - ) from e - if status_code == 400: - raise PermanentUploadError( - f"Bad request ({status_code}): {e}", file_name=file.filename - ) from e - raise TransientUploadError( - f"Upload failed: {e}", file_name=file.filename - ) from e + raise _classify_gemini_error(e, file.filename) from e async def aupload( self, file: FileInput, purpose: str | None = None @@ -237,11 +229,6 @@ class GeminiFileUploader(FileUploader): TransientUploadError: For retryable errors (network, rate limits). PermanentUploadError: For non-retryable errors (auth, validation). """ - from crewai.files.processing.exceptions import ( - PermanentUploadError, - TransientUploadError, - ) - try: client = self._get_client() display_name = purpose or file.filename @@ -302,40 +289,7 @@ class GeminiFileUploader(FileUploader): except (TransientUploadError, PermanentUploadError): raise except Exception as e: - error_msg = str(e).lower() - if "quota" in error_msg or "rate" in error_msg or "limit" in error_msg: - raise TransientUploadError( - f"Rate limit error: {e}", file_name=file.filename - ) from e - if ( - "auth" in error_msg - or "permission" in error_msg - or "denied" in error_msg - ): - raise PermanentUploadError( - f"Authentication/permission error: {e}", file_name=file.filename - ) from e - if "invalid" in error_msg or "unsupported" in error_msg: - raise PermanentUploadError( - f"Invalid request: {e}", file_name=file.filename - ) from e - status_code = getattr(e, "code", None) or getattr(e, "status_code", None) - if status_code is not None and isinstance(status_code, int): - if status_code >= 500 or status_code == 429: - raise TransientUploadError( - f"Server error ({status_code}): {e}", file_name=file.filename - ) from e - if status_code in (401, 403): - raise PermanentUploadError( - f"Auth error ({status_code}): {e}", file_name=file.filename - ) from e - if status_code == 400: - raise PermanentUploadError( - f"Bad request ({status_code}): {e}", file_name=file.filename - ) from e - raise TransientUploadError( - f"Upload failed: {e}", file_name=file.filename - ) from e + raise _classify_gemini_error(e, file.filename) from e def delete(self, file_id: str) -> bool: """Delete an uploaded file from Gemini. @@ -442,8 +396,6 @@ class GeminiFileUploader(FileUploader): client = self._get_client() start_time = time.time() - base_delay = 1.0 - max_delay = 30.0 attempt = 0 while time.time() - start_time < timeout_seconds: @@ -451,14 +403,11 @@ class GeminiFileUploader(FileUploader): if file_info.state == FileState.ACTIVE: return True - if file_info.state == FileState.FAILED: logger.error(f"Gemini file processing failed: {file_id}") return False - delay = min(base_delay * (2**attempt), max_delay) - jitter = random.uniform(0, delay * 0.1) # noqa: S311 - time.sleep(delay + jitter) + time.sleep(_compute_backoff_delay(attempt)) attempt += 1 logger.warning(f"Timed out waiting for Gemini file processing: {file_id}") @@ -485,8 +434,6 @@ class GeminiFileUploader(FileUploader): client = self._get_client() start_time = time.time() - base_delay = 1.0 - max_delay = 30.0 attempt = 0 while time.time() - start_time < timeout_seconds: @@ -494,14 +441,11 @@ class GeminiFileUploader(FileUploader): if file_info.state == FileState.ACTIVE: return True - if file_info.state == FileState.FAILED: logger.error(f"Gemini file processing failed: {file_id}") return False - delay = min(base_delay * (2**attempt), max_delay) - jitter = random.uniform(0, delay * 0.1) # noqa: S311 - await asyncio.sleep(delay + jitter) + await asyncio.sleep(_compute_backoff_delay(attempt)) attempt += 1 logger.warning(f"Timed out waiting for Gemini file processing: {file_id}") diff --git a/lib/crewai/src/crewai/files/uploaders/openai.py b/lib/crewai/src/crewai/files/uploaders/openai.py index 3327b708b..fd63a47ee 100644 --- a/lib/crewai/src/crewai/files/uploaders/openai.py +++ b/lib/crewai/src/crewai/files/uploaders/openai.py @@ -17,6 +17,11 @@ from crewai.files.content_types import ( VideoFile, ) from crewai.files.file import FileBytes, FilePath, FileStream +from crewai.files.processing.exceptions import ( + PermanentUploadError, + TransientUploadError, + classify_upload_error, +) from crewai.files.uploaders.base import FileUploader, UploadResult @@ -100,10 +105,6 @@ class OpenAIFileUploader(FileUploader): Uses the Files API for files up to 512MB (single request). Uses the Uploads API for files larger than 512MB (multipart chunked). - - Attributes: - api_key: Optional API key (uses OPENAI_API_KEY env var if not provided). - chunk_size: Chunk size for multipart uploads (default 64MB). """ def __init__( @@ -128,6 +129,24 @@ class OpenAIFileUploader(FileUploader): """Return the provider name.""" return "openai" + def _build_upload_result(self, file_id: str, content_type: str) -> UploadResult: + """Build an UploadResult for a completed upload. + + Args: + file_id: The uploaded file ID. + content_type: The file's content type. + + Returns: + UploadResult with the file metadata. + """ + return UploadResult( + file_id=file_id, + file_uri=None, + content_type=content_type, + expires_at=None, + provider=self.provider_name, + ) + def _get_client(self) -> Any: """Get or create the OpenAI client.""" if self._client is None: @@ -173,11 +192,6 @@ class OpenAIFileUploader(FileUploader): TransientUploadError: For retryable errors (network, rate limits). PermanentUploadError: For non-retryable errors (auth, validation). """ - from crewai.files.processing.exceptions import ( - PermanentUploadError, - TransientUploadError, - ) - try: file_size = _get_file_size(file) @@ -193,7 +207,7 @@ class OpenAIFileUploader(FileUploader): except (TransientUploadError, PermanentUploadError): raise except Exception as e: - raise self._classify_error(e, file.filename) from e + raise classify_upload_error(e, file.filename) from e def _upload_simple( self, @@ -228,13 +242,7 @@ class OpenAIFileUploader(FileUploader): logger.info(f"Uploaded to OpenAI: {uploaded_file.id}") - return UploadResult( - file_id=uploaded_file.id, - file_uri=None, - content_type=file.content_type, - expires_at=None, - provider=self.provider_name, - ) + return self._build_upload_result(uploaded_file.id, file.content_type) def _upload_multipart( self, @@ -299,13 +307,7 @@ class OpenAIFileUploader(FileUploader): file_id = completed.file.id if completed.file else upload.id logger.info(f"Completed multipart upload to OpenAI: {file_id}") - return UploadResult( - file_id=file_id, - file_uri=None, - content_type=file.content_type, - expires_at=None, - provider=self.provider_name, - ) + return self._build_upload_result(file_id, file.content_type) except Exception: logger.warning(f"Multipart upload failed, cancelling upload {upload.id}") try: @@ -373,13 +375,7 @@ class OpenAIFileUploader(FileUploader): file_id = completed.file.id if completed.file else upload.id logger.info(f"Completed streaming multipart upload to OpenAI: {file_id}") - return UploadResult( - file_id=file_id, - file_uri=None, - content_type=file.content_type, - expires_at=None, - provider=self.provider_name, - ) + return self._build_upload_result(file_id, file.content_type) except Exception: logger.warning(f"Multipart upload failed, cancelling upload {upload.id}") try: @@ -388,51 +384,6 @@ class OpenAIFileUploader(FileUploader): logger.debug(f"Failed to cancel upload: {cancel_err}") raise - @staticmethod - def _classify_error(e: Exception, filename: str | None) -> Exception: - """Classify an exception as transient or permanent. - - Args: - e: The exception to classify. - filename: The filename for error context. - - Returns: - TransientUploadError or PermanentUploadError. - """ - from crewai.files.processing.exceptions import ( - PermanentUploadError, - TransientUploadError, - ) - - error_type = type(e).__name__ - if "RateLimit" in error_type or "APIConnection" in error_type: - return TransientUploadError( - f"Transient upload error: {e}", file_name=filename - ) - if "Authentication" in error_type or "Permission" in error_type: - return PermanentUploadError( - f"Authentication/permission error: {e}", file_name=filename - ) - if "BadRequest" in error_type or "InvalidRequest" in error_type: - return PermanentUploadError(f"Invalid request: {e}", file_name=filename) - - status_code = getattr(e, "status_code", None) - if status_code is not None: - if status_code >= 500 or status_code == 429: - return TransientUploadError( - f"Server error ({status_code}): {e}", file_name=filename - ) - if status_code in (401, 403): - return PermanentUploadError( - f"Auth error ({status_code}): {e}", file_name=filename - ) - if status_code == 400: - return PermanentUploadError( - f"Bad request ({status_code}): {e}", file_name=filename - ) - - return TransientUploadError(f"Upload failed: {e}", file_name=filename) - def delete(self, file_id: str) -> bool: """Delete an uploaded file from OpenAI. @@ -518,11 +469,6 @@ class OpenAIFileUploader(FileUploader): TransientUploadError: For retryable errors (network, rate limits). PermanentUploadError: For non-retryable errors (auth, validation). """ - from crewai.files.processing.exceptions import ( - PermanentUploadError, - TransientUploadError, - ) - try: file_size = _get_file_size(file) @@ -538,7 +484,7 @@ class OpenAIFileUploader(FileUploader): except (TransientUploadError, PermanentUploadError): raise except Exception as e: - raise self._classify_error(e, file.filename) from e + raise classify_upload_error(e, file.filename) from e async def _aupload_simple( self, @@ -573,13 +519,7 @@ class OpenAIFileUploader(FileUploader): logger.info(f"Uploaded to OpenAI: {uploaded_file.id}") - return UploadResult( - file_id=uploaded_file.id, - file_uri=None, - content_type=file.content_type, - expires_at=None, - provider=self.provider_name, - ) + return self._build_upload_result(uploaded_file.id, file.content_type) async def _aupload_multipart( self, @@ -644,13 +584,7 @@ class OpenAIFileUploader(FileUploader): file_id = completed.file.id if completed.file else upload.id logger.info(f"Completed multipart upload to OpenAI: {file_id}") - return UploadResult( - file_id=file_id, - file_uri=None, - content_type=file.content_type, - expires_at=None, - provider=self.provider_name, - ) + return self._build_upload_result(file_id, file.content_type) except Exception: logger.warning(f"Multipart upload failed, cancelling upload {upload.id}") try: @@ -718,13 +652,7 @@ class OpenAIFileUploader(FileUploader): file_id = completed.file.id if completed.file else upload.id logger.info(f"Completed streaming multipart upload to OpenAI: {file_id}") - return UploadResult( - file_id=file_id, - file_uri=None, - content_type=file.content_type, - expires_at=None, - provider=self.provider_name, - ) + return self._build_upload_result(file_id, file.content_type) except Exception: logger.warning(f"Multipart upload failed, cancelling upload {upload.id}") try: