refactor: extract helper functions to reduce code duplication

This commit is contained in:
Greyson LaLonde
2026-01-22 09:52:23 -05:00
parent 0a250a45ce
commit da930fa1df
9 changed files with 405 additions and 550 deletions

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from abc import ABC
from io import IOBase
from pathlib import Path
from typing import Annotated, Any, Literal, Self
from typing import Annotated, Any, BinaryIO, Literal, Self
from pydantic import BaseModel, Field, GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema
@@ -37,15 +37,15 @@ class _FileSourceCoercer:
return FilePath(path=Path(v))
if isinstance(v, bytes):
return FileBytes(data=v)
if isinstance(v, IOBase):
if isinstance(v, (IOBase, BinaryIO)):
return FileStream(stream=v)
raise ValueError(f"Cannot convert {type(v).__name__} to file source")
@classmethod
def __get_pydantic_core_schema__(
cls,
source_type: Any,
handler: GetCoreSchemaHandler,
_source_type: Any,
_handler: GetCoreSchemaHandler,
) -> CoreSchema:
"""Generate Pydantic core schema for FileSource coercion."""
return core_schema.no_info_plain_validator_function(
@@ -261,7 +261,7 @@ class File(BaseFile):
The content type is automatically detected from the file contents.
Example:
>>> file = File(source="./document.pdf")
>>> file = File(source="./image.png")
>>> file = File(source=some_bytes)
>>> pdf_file = File(source="./document.pdf")
>>> image_file = File(source="./image.png")
>>> bytes_file = File(source=b"file content")
"""

View File

@@ -23,7 +23,9 @@ from pydantic_core import CoreSchema, core_schema
class AsyncReadable(Protocol):
"""Protocol for async readable streams."""
async def read(self, size: int = -1) -> bytes: ...
async def read(self, size: int = -1) -> bytes:
"""Read up to size bytes from the stream."""
...
class _AsyncReadableValidator:

View File

@@ -101,3 +101,45 @@ class TransientUploadError(UploadError, TransientFileError):
class PermanentUploadError(UploadError, PermanentFileError):
"""Upload failed permanently (auth failure, invalid file, unsupported type)."""
def classify_upload_error(e: Exception, filename: str | None = None) -> Exception:
"""Classify an exception as transient or permanent upload error.
Analyzes the exception type name and status code to determine if
the error is likely transient (retryable) or permanent.
Args:
e: The exception to classify.
filename: Optional filename for error context.
Returns:
A TransientUploadError or PermanentUploadError wrapping the original.
"""
error_type = type(e).__name__
if "RateLimit" in error_type or "APIConnection" in error_type:
return TransientUploadError(f"Transient upload error: {e}", file_name=filename)
if "Authentication" in error_type or "Permission" in error_type:
return PermanentUploadError(
f"Authentication/permission error: {e}", file_name=filename
)
if "BadRequest" in error_type or "InvalidRequest" in error_type:
return PermanentUploadError(f"Invalid request: {e}", file_name=filename)
status_code = getattr(e, "status_code", None)
if status_code is not None:
if status_code >= 500 or status_code == 429:
return TransientUploadError(
f"Server error ({status_code}): {e}", file_name=filename
)
if status_code in (401, 403):
return PermanentUploadError(
f"Auth error ({status_code}): {e}", file_name=filename
)
if status_code == 400:
return PermanentUploadError(
f"Bad request ({status_code}): {e}", file_name=filename
)
return TransientUploadError(f"Upload failed: {e}", file_name=filename)

View File

@@ -245,7 +245,7 @@ def chunk_text(
TextFile objects, one per chunk.
"""
content = file.read()
text = content.decode("utf-8", errors="replace")
text = content.decode(errors="replace")
total_chars = len(text)
if total_chars <= max_chars:
@@ -268,7 +268,7 @@ def chunk_text(
end_pos = last_newline + 1
chunk_content = text[start_pos:end_pos]
chunk_bytes = chunk_content.encode("utf-8")
chunk_bytes = chunk_content.encode()
chunk_filename = f"{base_filename}_chunk_{chunk_num}.{extension}"

View File

@@ -1,6 +1,7 @@
"""File validation functions for checking against provider constraints."""
from collections.abc import Sequence
import io
import logging
from crewai.files.content_types import (
@@ -30,6 +31,51 @@ logger = logging.getLogger(__name__)
FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile
def _get_image_dimensions(content: bytes) -> tuple[int, int] | None:
"""Get image dimensions using Pillow if available.
Args:
content: Raw image bytes.
Returns:
Tuple of (width, height) or None if Pillow unavailable.
"""
try:
from PIL import Image
with Image.open(io.BytesIO(content)) as img:
width, height = img.size
return (int(width), int(height))
except ImportError:
logger.warning(
"Pillow not installed - cannot validate image dimensions. "
"Install with: pip install Pillow"
)
return None
def _get_pdf_page_count(content: bytes) -> int | None:
"""Get PDF page count using pypdf if available.
Args:
content: Raw PDF bytes.
Returns:
Page count or None if pypdf unavailable.
"""
try:
from pypdf import PdfReader
reader = PdfReader(io.BytesIO(content))
return len(reader.pages)
except ImportError:
logger.warning(
"pypdf not installed - cannot validate PDF page count. "
"Install with: pip install pypdf"
)
return None
def _format_size(size_bytes: int) -> str:
"""Format byte size to human-readable string."""
if size_bytes >= 1024 * 1024 * 1024:
@@ -41,6 +87,69 @@ def _format_size(size_bytes: int) -> str:
return f"{size_bytes}B"
def _validate_size(
file_type: str,
filename: str | None,
file_size: int,
max_size: int,
errors: list[str],
raise_on_error: bool,
) -> None:
"""Validate file size against maximum.
Args:
file_type: Type label for error messages (e.g., "Image", "PDF").
filename: Name of the file being validated.
file_size: Actual file size in bytes.
max_size: Maximum allowed size in bytes.
errors: List to append error messages to.
raise_on_error: If True, raise FileTooLargeError on failure.
"""
if file_size > max_size:
msg = (
f"{file_type} '{filename}' size ({_format_size(file_size)}) exceeds "
f"maximum ({_format_size(max_size)})"
)
errors.append(msg)
if raise_on_error:
raise FileTooLargeError(
msg,
file_name=filename,
actual_size=file_size,
max_size=max_size,
)
def _validate_format(
file_type: str,
filename: str | None,
content_type: str,
supported_formats: tuple[str, ...],
errors: list[str],
raise_on_error: bool,
) -> None:
"""Validate content type against supported formats.
Args:
file_type: Type label for error messages (e.g., "Image", "Audio").
filename: Name of the file being validated.
content_type: MIME type of the file.
supported_formats: Tuple of supported MIME types.
errors: List to append error messages to.
raise_on_error: If True, raise UnsupportedFileTypeError on failure.
"""
if content_type not in supported_formats:
msg = (
f"{file_type} format '{content_type}' is not supported. "
f"Supported: {', '.join(supported_formats)}"
)
errors.append(msg)
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=filename, content_type=content_type
)
def validate_image(
file: ImageFile,
constraints: ImageConstraints,
@@ -67,64 +176,40 @@ def validate_image(
file_size = len(content)
filename = file.filename
if file_size > constraints.max_size_bytes:
msg = (
f"Image '{filename}' size ({_format_size(file_size)}) exceeds "
f"maximum ({_format_size(constraints.max_size_bytes)})"
)
errors.append(msg)
if raise_on_error:
raise FileTooLargeError(
msg,
file_name=filename,
actual_size=file_size,
max_size=constraints.max_size_bytes,
)
content_type = file.content_type
if content_type not in constraints.supported_formats:
msg = (
f"Image format '{content_type}' is not supported. "
f"Supported: {', '.join(constraints.supported_formats)}"
)
errors.append(msg)
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=filename, content_type=content_type
)
_validate_size(
"Image", filename, file_size, constraints.max_size_bytes, errors, raise_on_error
)
_validate_format(
"Image",
filename,
file.content_type,
constraints.supported_formats,
errors,
raise_on_error,
)
if constraints.max_width is not None or constraints.max_height is not None:
try:
import io
dimensions = _get_image_dimensions(content)
if dimensions is not None:
width, height = dimensions
from PIL import Image
if constraints.max_width and width > constraints.max_width:
msg = (
f"Image '{filename}' width ({width}px) exceeds "
f"maximum ({constraints.max_width}px)"
)
errors.append(msg)
if raise_on_error:
raise FileValidationError(msg, file_name=filename)
with Image.open(io.BytesIO(content)) as img:
width, height = img.size
if constraints.max_width and width > constraints.max_width:
msg = (
f"Image '{filename}' width ({width}px) exceeds "
f"maximum ({constraints.max_width}px)"
)
errors.append(msg)
if raise_on_error:
raise FileValidationError(msg, file_name=filename)
if constraints.max_height and height > constraints.max_height:
msg = (
f"Image '{filename}' height ({height}px) exceeds "
f"maximum ({constraints.max_height}px)"
)
errors.append(msg)
if raise_on_error:
raise FileValidationError(msg, file_name=filename)
except ImportError:
logger.warning(
"Pillow not installed - cannot validate image dimensions. "
"Install with: pip install Pillow"
)
if constraints.max_height and height > constraints.max_height:
msg = (
f"Image '{filename}' height ({height}px) exceeds "
f"maximum ({constraints.max_height}px)"
)
errors.append(msg)
if raise_on_error:
raise FileValidationError(msg, file_name=filename)
return errors
@@ -154,43 +239,20 @@ def validate_pdf(
file_size = len(content)
filename = file.filename
if file_size > constraints.max_size_bytes:
msg = (
f"PDF '{filename}' size ({_format_size(file_size)}) exceeds "
f"maximum ({_format_size(constraints.max_size_bytes)})"
)
errors.append(msg)
if raise_on_error:
raise FileTooLargeError(
msg,
file_name=filename,
actual_size=file_size,
max_size=constraints.max_size_bytes,
)
_validate_size(
"PDF", filename, file_size, constraints.max_size_bytes, errors, raise_on_error
)
if constraints.max_pages is not None:
try:
import io
from pypdf import PdfReader
reader = PdfReader(io.BytesIO(content))
page_count = len(reader.pages)
if page_count > constraints.max_pages:
msg = (
f"PDF '{filename}' page count ({page_count}) exceeds "
f"maximum ({constraints.max_pages})"
)
errors.append(msg)
if raise_on_error:
raise FileValidationError(msg, file_name=filename)
except ImportError:
logger.warning(
"pypdf not installed - cannot validate PDF page count. "
"Install with: pip install pypdf"
page_count = _get_pdf_page_count(content)
if page_count is not None and page_count > constraints.max_pages:
msg = (
f"PDF '{filename}' page count ({page_count}) exceeds "
f"maximum ({constraints.max_pages})"
)
errors.append(msg)
if raise_on_error:
raise FileValidationError(msg, file_name=filename)
return errors
@@ -216,35 +278,24 @@ def validate_audio(
UnsupportedFileTypeError: If the format is not supported.
"""
errors: list[str] = []
content = file.read()
file_size = len(content)
filename = file.filename
file_size = len(file.read())
if file_size > constraints.max_size_bytes:
msg = (
f"Audio '{filename}' size ({_format_size(file_size)}) exceeds "
f"maximum ({_format_size(constraints.max_size_bytes)})"
)
errors.append(msg)
if raise_on_error:
raise FileTooLargeError(
msg,
file_name=filename,
actual_size=file_size,
max_size=constraints.max_size_bytes,
)
content_type = file.content_type
if content_type not in constraints.supported_formats:
msg = (
f"Audio format '{content_type}' is not supported. "
f"Supported: {', '.join(constraints.supported_formats)}"
)
errors.append(msg)
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=filename, content_type=content_type
)
_validate_size(
"Audio",
file.filename,
file_size,
constraints.max_size_bytes,
errors,
raise_on_error,
)
_validate_format(
"Audio",
file.filename,
file.content_type,
constraints.supported_formats,
errors,
raise_on_error,
)
return errors
@@ -270,35 +321,24 @@ def validate_video(
UnsupportedFileTypeError: If the format is not supported.
"""
errors: list[str] = []
content = file.read()
file_size = len(content)
filename = file.filename
file_size = len(file.read())
if file_size > constraints.max_size_bytes:
msg = (
f"Video '{filename}' size ({_format_size(file_size)}) exceeds "
f"maximum ({_format_size(constraints.max_size_bytes)})"
)
errors.append(msg)
if raise_on_error:
raise FileTooLargeError(
msg,
file_name=filename,
actual_size=file_size,
max_size=constraints.max_size_bytes,
)
content_type = file.content_type
if content_type not in constraints.supported_formats:
msg = (
f"Video format '{content_type}' is not supported. "
f"Supported: {', '.join(constraints.supported_formats)}"
)
errors.append(msg)
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=filename, content_type=content_type
)
_validate_size(
"Video",
file.filename,
file_size,
constraints.max_size_bytes,
errors,
raise_on_error,
)
_validate_format(
"Video",
file.filename,
file.content_type,
constraints.supported_formats,
errors,
raise_on_error,
)
return errors
@@ -327,27 +367,47 @@ def validate_text(
if constraints.general_max_size_bytes is None:
return errors
content = file.read()
file_size = len(content)
filename = file.filename
if file_size > constraints.general_max_size_bytes:
msg = (
f"Text file '{filename}' size ({_format_size(file_size)}) exceeds "
f"maximum ({_format_size(constraints.general_max_size_bytes)})"
)
errors.append(msg)
if raise_on_error:
raise FileTooLargeError(
msg,
file_name=filename,
actual_size=file_size,
max_size=constraints.general_max_size_bytes,
)
file_size = len(file.read())
_validate_size(
"Text file",
file.filename,
file_size,
constraints.general_max_size_bytes,
errors,
raise_on_error,
)
return errors
def _check_unsupported_type(
file: FileInput,
provider_name: str,
type_name: str,
raise_on_error: bool,
) -> Sequence[str]:
"""Check if file type is unsupported and handle error.
Args:
file: The file being validated.
provider_name: Name of the provider.
type_name: Name of the file type (e.g., "images", "PDFs").
raise_on_error: If True, raise exception instead of returning errors.
Returns:
List with error message (only returns when raise_on_error is False).
Raises:
UnsupportedFileTypeError: If raise_on_error is True.
"""
msg = f"Provider '{provider_name}' does not support {type_name}"
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=file.filename, content_type=file.content_type
)
return [msg]
def validate_file(
file: FileInput,
constraints: ProviderConstraints,
@@ -373,42 +433,30 @@ def validate_file(
"""
if isinstance(file, ImageFile):
if constraints.image is None:
msg = f"Provider '{constraints.name}' does not support images"
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=file.filename, content_type=file.content_type
)
return [msg]
return _check_unsupported_type(
file, constraints.name, "images", raise_on_error
)
return validate_image(file, constraints.image, raise_on_error=raise_on_error)
if isinstance(file, PDFFile):
if constraints.pdf is None:
msg = f"Provider '{constraints.name}' does not support PDFs"
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=file.filename, content_type=file.content_type
)
return [msg]
return _check_unsupported_type(
file, constraints.name, "PDFs", raise_on_error
)
return validate_pdf(file, constraints.pdf, raise_on_error=raise_on_error)
if isinstance(file, AudioFile):
if constraints.audio is None:
msg = f"Provider '{constraints.name}' does not support audio"
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=file.filename, content_type=file.content_type
)
return [msg]
return _check_unsupported_type(
file, constraints.name, "audio", raise_on_error
)
return validate_audio(file, constraints.audio, raise_on_error=raise_on_error)
if isinstance(file, VideoFile):
if constraints.video is None:
msg = f"Provider '{constraints.name}' does not support video"
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=file.filename, content_type=file.content_type
)
return [msg]
return _check_unsupported_type(
file, constraints.name, "video", raise_on_error
)
return validate_video(file, constraints.video, raise_on_error=raise_on_error)
if isinstance(file, TextFile):

View File

@@ -15,6 +15,7 @@ from crewai.files.content_types import (
TextFile,
VideoFile,
)
from crewai.files.processing.exceptions import classify_upload_error
from crewai.files.uploaders.base import FileUploader, UploadResult
@@ -28,9 +29,6 @@ class AnthropicFileUploader(FileUploader):
Uses the anthropic SDK to upload files. Files are stored persistently
until explicitly deleted.
Attributes:
api_key: Optional API key (uses ANTHROPIC_API_KEY env var if not provided).
"""
def __init__(self, api_key: str | None = None) -> None:
@@ -91,11 +89,6 @@ class AnthropicFileUploader(FileUploader):
TransientUploadError: For retryable errors (network, rate limits).
PermanentUploadError: For non-retryable errors (auth, validation).
"""
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
try:
client = self._get_client()
@@ -125,36 +118,7 @@ class AnthropicFileUploader(FileUploader):
except ImportError:
raise
except Exception as e:
error_type = type(e).__name__
if "RateLimit" in error_type or "APIConnection" in error_type:
raise TransientUploadError(
f"Transient upload error: {e}", file_name=file.filename
) from e
if "Authentication" in error_type or "Permission" in error_type:
raise PermanentUploadError(
f"Authentication/permission error: {e}", file_name=file.filename
) from e
if "BadRequest" in error_type or "InvalidRequest" in error_type:
raise PermanentUploadError(
f"Invalid request: {e}", file_name=file.filename
) from e
status_code = getattr(e, "status_code", None)
if status_code is not None:
if status_code >= 500 or status_code == 429:
raise TransientUploadError(
f"Server error ({status_code}): {e}", file_name=file.filename
) from e
if status_code in (401, 403):
raise PermanentUploadError(
f"Auth error ({status_code}): {e}", file_name=file.filename
) from e
if status_code == 400:
raise PermanentUploadError(
f"Bad request ({status_code}): {e}", file_name=file.filename
) from e
raise TransientUploadError(
f"Upload failed: {e}", file_name=file.filename
) from e
raise classify_upload_error(e, file.filename) from e
def delete(self, file_id: str) -> bool:
"""Delete an uploaded file from Anthropic.
@@ -236,11 +200,6 @@ class AnthropicFileUploader(FileUploader):
TransientUploadError: For retryable errors (network, rate limits).
PermanentUploadError: For non-retryable errors (auth, validation).
"""
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
try:
client = self._get_async_client()
@@ -270,36 +229,7 @@ class AnthropicFileUploader(FileUploader):
except ImportError:
raise
except Exception as e:
error_type = type(e).__name__
if "RateLimit" in error_type or "APIConnection" in error_type:
raise TransientUploadError(
f"Transient upload error: {e}", file_name=file.filename
) from e
if "Authentication" in error_type or "Permission" in error_type:
raise PermanentUploadError(
f"Authentication/permission error: {e}", file_name=file.filename
) from e
if "BadRequest" in error_type or "InvalidRequest" in error_type:
raise PermanentUploadError(
f"Invalid request: {e}", file_name=file.filename
) from e
status_code = getattr(e, "status_code", None)
if status_code is not None:
if status_code >= 500 or status_code == 429:
raise TransientUploadError(
f"Server error ({status_code}): {e}", file_name=file.filename
) from e
if status_code in (401, 403):
raise PermanentUploadError(
f"Auth error ({status_code}): {e}", file_name=file.filename
) from e
if status_code == 400:
raise PermanentUploadError(
f"Bad request ({status_code}): {e}", file_name=file.filename
) from e
raise TransientUploadError(
f"Upload failed: {e}", file_name=file.filename
) from e
raise classify_upload_error(e, file.filename) from e
async def adelete(self, file_id: str) -> bool:
"""Async delete an uploaded file from Anthropic.

View File

@@ -17,6 +17,10 @@ from crewai.files.content_types import (
VideoFile,
)
from crewai.files.file import FileBytes, FilePath
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
from crewai.files.uploaders.base import FileUploader, UploadResult
@@ -29,6 +33,30 @@ MULTIPART_CHUNKSIZE = 8 * 1024 * 1024
MAX_CONCURRENCY = 10
def _classify_s3_error(e: Exception, filename: str | None) -> Exception:
"""Classify an S3 exception as transient or permanent upload error.
Args:
e: The exception to classify.
filename: The filename for error context.
Returns:
A TransientUploadError or PermanentUploadError wrapping the original.
"""
error_type = type(e).__name__
error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "")
if error_code in ("SlowDown", "ServiceUnavailable", "InternalError"):
return TransientUploadError(f"Transient S3 error: {e}", file_name=filename)
if error_code in ("AccessDenied", "InvalidAccessKeyId", "SignatureDoesNotMatch"):
return PermanentUploadError(f"S3 authentication error: {e}", file_name=filename)
if error_code in ("NoSuchBucket", "InvalidBucketName"):
return PermanentUploadError(f"S3 bucket error: {e}", file_name=filename)
if "Throttl" in error_type or "Throttl" in str(e):
return TransientUploadError(f"S3 throttling: {e}", file_name=filename)
return TransientUploadError(f"S3 upload failed: {e}", file_name=filename)
def _get_file_path(file: FileInput) -> Path | None:
"""Get the filesystem path if file source is FilePath.
@@ -82,12 +110,6 @@ class BedrockFileUploader(FileUploader):
Uploads files to S3 and returns S3 URIs that can be used with Bedrock's
Converse API s3Location source format.
Attributes:
bucket_name: S3 bucket name for file uploads.
bucket_owner: Optional bucket owner account ID for cross-account access.
prefix: Optional S3 key prefix for uploaded files.
region: AWS region for the S3 bucket.
"""
def __init__(
@@ -213,7 +235,6 @@ class BedrockFileUploader(FileUploader):
multipart_threshold=MULTIPART_THRESHOLD,
multipart_chunksize=MULTIPART_CHUNKSIZE,
max_concurrency=MAX_CONCURRENCY,
use_threads=True,
)
def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult:
@@ -235,11 +256,6 @@ class BedrockFileUploader(FileUploader):
"""
import io
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
try:
client = self._get_client()
transfer_config = self._get_transfer_config()
@@ -292,32 +308,7 @@ class BedrockFileUploader(FileUploader):
except ImportError:
raise
except Exception as e:
error_type = type(e).__name__
error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "")
if error_code in ("SlowDown", "ServiceUnavailable", "InternalError"):
raise TransientUploadError(
f"Transient S3 error: {e}", file_name=file.filename
) from e
if error_code in (
"AccessDenied",
"InvalidAccessKeyId",
"SignatureDoesNotMatch",
):
raise PermanentUploadError(
f"S3 authentication error: {e}", file_name=file.filename
) from e
if error_code in ("NoSuchBucket", "InvalidBucketName"):
raise PermanentUploadError(
f"S3 bucket error: {e}", file_name=file.filename
) from e
if "Throttl" in error_type or "Throttl" in str(e):
raise TransientUploadError(
f"S3 throttling: {e}", file_name=file.filename
) from e
raise TransientUploadError(
f"S3 upload failed: {e}", file_name=file.filename
) from e
raise _classify_s3_error(e, file.filename) from e
def delete(self, file_id: str) -> bool:
"""Delete an uploaded file from S3.
@@ -412,11 +403,6 @@ class BedrockFileUploader(FileUploader):
import aiofiles
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
try:
session = self._get_async_client()
transfer_config = self._get_transfer_config()
@@ -471,32 +457,7 @@ class BedrockFileUploader(FileUploader):
except ImportError:
raise
except Exception as e:
error_type = type(e).__name__
error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "")
if error_code in ("SlowDown", "ServiceUnavailable", "InternalError"):
raise TransientUploadError(
f"Transient S3 error: {e}", file_name=file.filename
) from e
if error_code in (
"AccessDenied",
"InvalidAccessKeyId",
"SignatureDoesNotMatch",
):
raise PermanentUploadError(
f"S3 authentication error: {e}", file_name=file.filename
) from e
if error_code in ("NoSuchBucket", "InvalidBucketName"):
raise PermanentUploadError(
f"S3 bucket error: {e}", file_name=file.filename
) from e
if "Throttl" in error_type or "Throttl" in str(e):
raise TransientUploadError(
f"S3 throttling: {e}", file_name=file.filename
) from e
raise TransientUploadError(
f"S3 upload failed: {e}", file_name=file.filename
) from e
raise _classify_s3_error(e, file.filename) from e
async def adelete(self, file_id: str) -> bool:
"""Async delete an uploaded file from S3.

View File

@@ -21,6 +21,11 @@ from crewai.files.content_types import (
VideoFile,
)
from crewai.files.file import FilePath
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
classify_upload_error,
)
from crewai.files.uploaders.base import FileUploader, UploadResult
@@ -30,6 +35,51 @@ FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile
GEMINI_FILE_TTL = timedelta(hours=48)
BACKOFF_BASE_DELAY = 1.0
BACKOFF_MAX_DELAY = 30.0
BACKOFF_JITTER_FACTOR = 0.1
def _compute_backoff_delay(attempt: int) -> float:
"""Compute exponential backoff delay with jitter.
Args:
attempt: The current attempt number (0-indexed).
Returns:
Delay in seconds with jitter applied.
"""
delay: float = min(BACKOFF_BASE_DELAY * (2**attempt), BACKOFF_MAX_DELAY)
jitter: float = random.uniform(0, delay * BACKOFF_JITTER_FACTOR) # noqa: S311
return float(delay + jitter)
def _classify_gemini_error(e: Exception, filename: str | None) -> Exception:
"""Classify a Gemini exception as transient or permanent upload error.
Checks Gemini-specific error message patterns first, then falls back
to generic status code classification.
Args:
e: The exception to classify.
filename: The filename for error context.
Returns:
A TransientUploadError or PermanentUploadError wrapping the original.
"""
error_msg = str(e).lower()
if "quota" in error_msg or "rate" in error_msg or "limit" in error_msg:
return TransientUploadError(f"Rate limit error: {e}", file_name=filename)
if "auth" in error_msg or "permission" in error_msg or "denied" in error_msg:
return PermanentUploadError(
f"Authentication/permission error: {e}", file_name=filename
)
if "invalid" in error_msg or "unsupported" in error_msg:
return PermanentUploadError(f"Invalid request: {e}", file_name=filename)
return classify_upload_error(e, filename)
def _get_file_path(file: FileInput) -> Path | None:
"""Get the filesystem path if file source is FilePath.
@@ -46,28 +96,10 @@ def _get_file_path(file: FileInput) -> Path | None:
return None
def _get_file_size(file: FileInput) -> int | None:
"""Get file size without reading content if possible.
Args:
file: The file input.
Returns:
Size in bytes if determinable without reading, None otherwise.
"""
source = file._file_source
if isinstance(source, FilePath):
return source.path.stat().st_size
return None
class GeminiFileUploader(FileUploader):
"""Uploader for Google Gemini File API.
Uses the google-genai SDK to upload files. Files are stored for 48 hours.
Attributes:
api_key: Optional API key (uses GOOGLE_API_KEY env var if not provided).
"""
def __init__(self, api_key: str | None = None) -> None:
@@ -116,11 +148,6 @@ class GeminiFileUploader(FileUploader):
TransientUploadError: For retryable errors (network, rate limits).
PermanentUploadError: For non-retryable errors (auth, validation).
"""
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
try:
client = self._get_client()
display_name = purpose or file.filename
@@ -181,42 +208,7 @@ class GeminiFileUploader(FileUploader):
except (TransientUploadError, PermanentUploadError):
raise
except Exception as e:
error_msg = str(e).lower()
if "quota" in error_msg or "rate" in error_msg or "limit" in error_msg:
raise TransientUploadError(
f"Rate limit error: {e}", file_name=file.filename
) from e
if (
"auth" in error_msg
or "permission" in error_msg
or "denied" in error_msg
):
raise PermanentUploadError(
f"Authentication/permission error: {e}", file_name=file.filename
) from e
if "invalid" in error_msg or "unsupported" in error_msg:
raise PermanentUploadError(
f"Invalid request: {e}", file_name=file.filename
) from e
status_code = getattr(e, "code", None) or getattr(e, "status_code", None)
if status_code is not None:
if isinstance(status_code, int):
if status_code >= 500 or status_code == 429:
raise TransientUploadError(
f"Server error ({status_code}): {e}",
file_name=file.filename,
) from e
if status_code in (401, 403):
raise PermanentUploadError(
f"Auth error ({status_code}): {e}", file_name=file.filename
) from e
if status_code == 400:
raise PermanentUploadError(
f"Bad request ({status_code}): {e}", file_name=file.filename
) from e
raise TransientUploadError(
f"Upload failed: {e}", file_name=file.filename
) from e
raise _classify_gemini_error(e, file.filename) from e
async def aupload(
self, file: FileInput, purpose: str | None = None
@@ -237,11 +229,6 @@ class GeminiFileUploader(FileUploader):
TransientUploadError: For retryable errors (network, rate limits).
PermanentUploadError: For non-retryable errors (auth, validation).
"""
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
try:
client = self._get_client()
display_name = purpose or file.filename
@@ -302,40 +289,7 @@ class GeminiFileUploader(FileUploader):
except (TransientUploadError, PermanentUploadError):
raise
except Exception as e:
error_msg = str(e).lower()
if "quota" in error_msg or "rate" in error_msg or "limit" in error_msg:
raise TransientUploadError(
f"Rate limit error: {e}", file_name=file.filename
) from e
if (
"auth" in error_msg
or "permission" in error_msg
or "denied" in error_msg
):
raise PermanentUploadError(
f"Authentication/permission error: {e}", file_name=file.filename
) from e
if "invalid" in error_msg or "unsupported" in error_msg:
raise PermanentUploadError(
f"Invalid request: {e}", file_name=file.filename
) from e
status_code = getattr(e, "code", None) or getattr(e, "status_code", None)
if status_code is not None and isinstance(status_code, int):
if status_code >= 500 or status_code == 429:
raise TransientUploadError(
f"Server error ({status_code}): {e}", file_name=file.filename
) from e
if status_code in (401, 403):
raise PermanentUploadError(
f"Auth error ({status_code}): {e}", file_name=file.filename
) from e
if status_code == 400:
raise PermanentUploadError(
f"Bad request ({status_code}): {e}", file_name=file.filename
) from e
raise TransientUploadError(
f"Upload failed: {e}", file_name=file.filename
) from e
raise _classify_gemini_error(e, file.filename) from e
def delete(self, file_id: str) -> bool:
"""Delete an uploaded file from Gemini.
@@ -442,8 +396,6 @@ class GeminiFileUploader(FileUploader):
client = self._get_client()
start_time = time.time()
base_delay = 1.0
max_delay = 30.0
attempt = 0
while time.time() - start_time < timeout_seconds:
@@ -451,14 +403,11 @@ class GeminiFileUploader(FileUploader):
if file_info.state == FileState.ACTIVE:
return True
if file_info.state == FileState.FAILED:
logger.error(f"Gemini file processing failed: {file_id}")
return False
delay = min(base_delay * (2**attempt), max_delay)
jitter = random.uniform(0, delay * 0.1) # noqa: S311
time.sleep(delay + jitter)
time.sleep(_compute_backoff_delay(attempt))
attempt += 1
logger.warning(f"Timed out waiting for Gemini file processing: {file_id}")
@@ -485,8 +434,6 @@ class GeminiFileUploader(FileUploader):
client = self._get_client()
start_time = time.time()
base_delay = 1.0
max_delay = 30.0
attempt = 0
while time.time() - start_time < timeout_seconds:
@@ -494,14 +441,11 @@ class GeminiFileUploader(FileUploader):
if file_info.state == FileState.ACTIVE:
return True
if file_info.state == FileState.FAILED:
logger.error(f"Gemini file processing failed: {file_id}")
return False
delay = min(base_delay * (2**attempt), max_delay)
jitter = random.uniform(0, delay * 0.1) # noqa: S311
await asyncio.sleep(delay + jitter)
await asyncio.sleep(_compute_backoff_delay(attempt))
attempt += 1
logger.warning(f"Timed out waiting for Gemini file processing: {file_id}")

View File

@@ -17,6 +17,11 @@ from crewai.files.content_types import (
VideoFile,
)
from crewai.files.file import FileBytes, FilePath, FileStream
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
classify_upload_error,
)
from crewai.files.uploaders.base import FileUploader, UploadResult
@@ -100,10 +105,6 @@ class OpenAIFileUploader(FileUploader):
Uses the Files API for files up to 512MB (single request).
Uses the Uploads API for files larger than 512MB (multipart chunked).
Attributes:
api_key: Optional API key (uses OPENAI_API_KEY env var if not provided).
chunk_size: Chunk size for multipart uploads (default 64MB).
"""
def __init__(
@@ -128,6 +129,24 @@ class OpenAIFileUploader(FileUploader):
"""Return the provider name."""
return "openai"
def _build_upload_result(self, file_id: str, content_type: str) -> UploadResult:
"""Build an UploadResult for a completed upload.
Args:
file_id: The uploaded file ID.
content_type: The file's content type.
Returns:
UploadResult with the file metadata.
"""
return UploadResult(
file_id=file_id,
file_uri=None,
content_type=content_type,
expires_at=None,
provider=self.provider_name,
)
def _get_client(self) -> Any:
"""Get or create the OpenAI client."""
if self._client is None:
@@ -173,11 +192,6 @@ class OpenAIFileUploader(FileUploader):
TransientUploadError: For retryable errors (network, rate limits).
PermanentUploadError: For non-retryable errors (auth, validation).
"""
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
try:
file_size = _get_file_size(file)
@@ -193,7 +207,7 @@ class OpenAIFileUploader(FileUploader):
except (TransientUploadError, PermanentUploadError):
raise
except Exception as e:
raise self._classify_error(e, file.filename) from e
raise classify_upload_error(e, file.filename) from e
def _upload_simple(
self,
@@ -228,13 +242,7 @@ class OpenAIFileUploader(FileUploader):
logger.info(f"Uploaded to OpenAI: {uploaded_file.id}")
return UploadResult(
file_id=uploaded_file.id,
file_uri=None,
content_type=file.content_type,
expires_at=None,
provider=self.provider_name,
)
return self._build_upload_result(uploaded_file.id, file.content_type)
def _upload_multipart(
self,
@@ -299,13 +307,7 @@ class OpenAIFileUploader(FileUploader):
file_id = completed.file.id if completed.file else upload.id
logger.info(f"Completed multipart upload to OpenAI: {file_id}")
return UploadResult(
file_id=file_id,
file_uri=None,
content_type=file.content_type,
expires_at=None,
provider=self.provider_name,
)
return self._build_upload_result(file_id, file.content_type)
except Exception:
logger.warning(f"Multipart upload failed, cancelling upload {upload.id}")
try:
@@ -373,13 +375,7 @@ class OpenAIFileUploader(FileUploader):
file_id = completed.file.id if completed.file else upload.id
logger.info(f"Completed streaming multipart upload to OpenAI: {file_id}")
return UploadResult(
file_id=file_id,
file_uri=None,
content_type=file.content_type,
expires_at=None,
provider=self.provider_name,
)
return self._build_upload_result(file_id, file.content_type)
except Exception:
logger.warning(f"Multipart upload failed, cancelling upload {upload.id}")
try:
@@ -388,51 +384,6 @@ class OpenAIFileUploader(FileUploader):
logger.debug(f"Failed to cancel upload: {cancel_err}")
raise
@staticmethod
def _classify_error(e: Exception, filename: str | None) -> Exception:
"""Classify an exception as transient or permanent.
Args:
e: The exception to classify.
filename: The filename for error context.
Returns:
TransientUploadError or PermanentUploadError.
"""
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
error_type = type(e).__name__
if "RateLimit" in error_type or "APIConnection" in error_type:
return TransientUploadError(
f"Transient upload error: {e}", file_name=filename
)
if "Authentication" in error_type or "Permission" in error_type:
return PermanentUploadError(
f"Authentication/permission error: {e}", file_name=filename
)
if "BadRequest" in error_type or "InvalidRequest" in error_type:
return PermanentUploadError(f"Invalid request: {e}", file_name=filename)
status_code = getattr(e, "status_code", None)
if status_code is not None:
if status_code >= 500 or status_code == 429:
return TransientUploadError(
f"Server error ({status_code}): {e}", file_name=filename
)
if status_code in (401, 403):
return PermanentUploadError(
f"Auth error ({status_code}): {e}", file_name=filename
)
if status_code == 400:
return PermanentUploadError(
f"Bad request ({status_code}): {e}", file_name=filename
)
return TransientUploadError(f"Upload failed: {e}", file_name=filename)
def delete(self, file_id: str) -> bool:
"""Delete an uploaded file from OpenAI.
@@ -518,11 +469,6 @@ class OpenAIFileUploader(FileUploader):
TransientUploadError: For retryable errors (network, rate limits).
PermanentUploadError: For non-retryable errors (auth, validation).
"""
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
try:
file_size = _get_file_size(file)
@@ -538,7 +484,7 @@ class OpenAIFileUploader(FileUploader):
except (TransientUploadError, PermanentUploadError):
raise
except Exception as e:
raise self._classify_error(e, file.filename) from e
raise classify_upload_error(e, file.filename) from e
async def _aupload_simple(
self,
@@ -573,13 +519,7 @@ class OpenAIFileUploader(FileUploader):
logger.info(f"Uploaded to OpenAI: {uploaded_file.id}")
return UploadResult(
file_id=uploaded_file.id,
file_uri=None,
content_type=file.content_type,
expires_at=None,
provider=self.provider_name,
)
return self._build_upload_result(uploaded_file.id, file.content_type)
async def _aupload_multipart(
self,
@@ -644,13 +584,7 @@ class OpenAIFileUploader(FileUploader):
file_id = completed.file.id if completed.file else upload.id
logger.info(f"Completed multipart upload to OpenAI: {file_id}")
return UploadResult(
file_id=file_id,
file_uri=None,
content_type=file.content_type,
expires_at=None,
provider=self.provider_name,
)
return self._build_upload_result(file_id, file.content_type)
except Exception:
logger.warning(f"Multipart upload failed, cancelling upload {upload.id}")
try:
@@ -718,13 +652,7 @@ class OpenAIFileUploader(FileUploader):
file_id = completed.file.id if completed.file else upload.id
logger.info(f"Completed streaming multipart upload to OpenAI: {file_id}")
return UploadResult(
file_id=file_id,
file_uri=None,
content_type=file.content_type,
expires_at=None,
provider=self.provider_name,
)
return self._build_upload_result(file_id, file.content_type)
except Exception:
logger.warning(f"Multipart upload failed, cancelling upload {upload.id}")
try: