diff --git a/lib/crewai/src/crewai/files/uploaders/bedrock.py b/lib/crewai/src/crewai/files/uploaders/bedrock.py index e0fbbef4c..aa9ad55c4 100644 --- a/lib/crewai/src/crewai/files/uploaders/bedrock.py +++ b/lib/crewai/src/crewai/files/uploaders/bedrock.py @@ -5,6 +5,7 @@ from __future__ import annotations import hashlib import logging import os +from pathlib import Path from typing import Any from crewai.files.content_types import ( @@ -15,6 +16,7 @@ from crewai.files.content_types import ( TextFile, VideoFile, ) +from crewai.files.file import FileBytes, FilePath from crewai.files.uploaders.base import FileUploader, UploadResult @@ -22,6 +24,58 @@ logger = logging.getLogger(__name__) FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile +MULTIPART_THRESHOLD = 8 * 1024 * 1024 +MULTIPART_CHUNKSIZE = 8 * 1024 * 1024 +MAX_CONCURRENCY = 10 + + +def _get_file_path(file: FileInput) -> Path | None: + """Get the filesystem path if file source is FilePath. + + Args: + file: The file input to check. + + Returns: + Path if source is FilePath, None otherwise. + """ + source = file._file_source + if isinstance(source, FilePath): + return source.path + return None + + +def _get_file_size(file: FileInput) -> int | None: + """Get file size without reading content if possible. + + Args: + file: The file input. + + Returns: + Size in bytes if determinable without reading, None otherwise. + """ + source = file._file_source + if isinstance(source, FilePath): + return source.path.stat().st_size + if isinstance(source, FileBytes): + return len(source.data) + return None + + +def _compute_hash_streaming(file_path: Path) -> str: + """Compute SHA-256 hash by streaming file content. + + Args: + file_path: Path to the file. + + Returns: + First 16 characters of hex digest. + """ + hasher = hashlib.sha256() + with open(file_path, "rb") as f: + while chunk := f.read(1024 * 1024): + hasher.update(chunk) + return hasher.hexdigest()[:16] + class BedrockFileUploader(FileUploader): """Uploader for AWS Bedrock via S3. @@ -112,19 +166,28 @@ class BedrockFileUploader(FileUploader): ) from e return self._session - def _generate_s3_key(self, file: FileInput, content: bytes) -> str: + def _generate_s3_key(self, file: FileInput, content: bytes | None = None) -> str: """Generate a unique S3 key for the file. + For FilePath sources with no content provided, computes hash via streaming. + Args: file: The file being uploaded. - content: The file content bytes. + content: The file content bytes (optional for FilePath sources). Returns: S3 key string. """ - content_hash = hashlib.sha256(content).hexdigest()[:16] - filename = file.filename or "file" + if content is not None: + content_hash = hashlib.sha256(content).hexdigest()[:16] + else: + file_path = _get_file_path(file) + if file_path is not None: + content_hash = _compute_hash_streaming(file_path) + else: + content_hash = hashlib.sha256(file.read()).hexdigest()[:16] + filename = file.filename or "file" safe_filename = "".join( c if c.isalnum() or c in ".-_" else "_" for c in filename ) @@ -141,9 +204,23 @@ class BedrockFileUploader(FileUploader): """ return f"s3://{self.bucket_name}/{key}" + def _get_transfer_config(self) -> Any: + """Get boto3 TransferConfig for multipart uploads.""" + from boto3.s3.transfer import TransferConfig + + return TransferConfig( + multipart_threshold=MULTIPART_THRESHOLD, + multipart_chunksize=MULTIPART_CHUNKSIZE, + max_concurrency=MAX_CONCURRENCY, + use_threads=True, + ) + def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult: """Upload a file to S3 for use with Bedrock. + Uses streaming upload with automatic multipart for large files. + For FilePath sources, streams directly from disk without loading into memory. + Args: file: The file to upload. purpose: Optional purpose (unused, kept for interface consistency). @@ -155,6 +232,8 @@ class BedrockFileUploader(FileUploader): TransientUploadError: For retryable errors (network, throttling). PermanentUploadError: For non-retryable errors (auth, validation). """ + import io + from crewai.files.processing.exceptions import ( PermanentUploadError, TransientUploadError, @@ -162,20 +241,42 @@ class BedrockFileUploader(FileUploader): try: client = self._get_client() - content = file.read() - s3_key = self._generate_s3_key(file, content) + transfer_config = self._get_transfer_config() + file_path = _get_file_path(file) - logger.info( - f"Uploading file '{file.filename}' to S3 bucket " - f"'{self.bucket_name}' ({len(content)} bytes)" - ) + if file_path is not None: + file_size = file_path.stat().st_size + s3_key = self._generate_s3_key(file) - client.put_object( - Bucket=self.bucket_name, - Key=s3_key, - Body=content, - ContentType=file.content_type, - ) + logger.info( + f"Uploading file '{file.filename}' to S3 bucket " + f"'{self.bucket_name}' ({file_size} bytes, streaming)" + ) + + with open(file_path, "rb") as f: + client.upload_fileobj( + f, + self.bucket_name, + s3_key, + ExtraArgs={"ContentType": file.content_type}, + Config=transfer_config, + ) + else: + content = file.read() + s3_key = self._generate_s3_key(file, content) + + logger.info( + f"Uploading file '{file.filename}' to S3 bucket " + f"'{self.bucket_name}' ({len(content)} bytes)" + ) + + client.upload_fileobj( + io.BytesIO(content), + self.bucket_name, + s3_key, + ExtraArgs={"ContentType": file.content_type}, + Config=transfer_config, + ) s3_uri = self._build_s3_uri(s3_key) logger.info(f"Uploaded to S3: {s3_uri}") @@ -292,6 +393,9 @@ class BedrockFileUploader(FileUploader): ) -> UploadResult: """Async upload a file to S3 for use with Bedrock. + Uses streaming upload with automatic multipart for large files. + For FilePath sources, streams directly from disk without loading into memory. + Args: file: The file to upload. purpose: Optional purpose (unused, kept for interface consistency). @@ -303,6 +407,10 @@ class BedrockFileUploader(FileUploader): TransientUploadError: For retryable errors (network, throttling). PermanentUploadError: For non-retryable errors (auth, validation). """ + import io + + import aiofiles + from crewai.files.processing.exceptions import ( PermanentUploadError, TransientUploadError, @@ -310,22 +418,45 @@ class BedrockFileUploader(FileUploader): try: session = self._get_async_client() - content = await file.aread() - s3_key = self._generate_s3_key(file, content) + transfer_config = self._get_transfer_config() + file_path = _get_file_path(file) - logger.info( - f"Uploading file '{file.filename}' to S3 bucket " - f"'{self.bucket_name}' ({len(content)} bytes)" - ) + if file_path is not None: + file_size = file_path.stat().st_size + s3_key = self._generate_s3_key(file) - async with session.client("s3", region_name=self._region) as client: - await client.put_object( - Bucket=self.bucket_name, - Key=s3_key, - Body=content, - ContentType=file.content_type, + logger.info( + f"Uploading file '{file.filename}' to S3 bucket " + f"'{self.bucket_name}' ({file_size} bytes, streaming)" ) + async with session.client("s3", region_name=self._region) as client: + async with aiofiles.open(file_path, "rb") as f: + await client.upload_fileobj( + f, + self.bucket_name, + s3_key, + ExtraArgs={"ContentType": file.content_type}, + Config=transfer_config, + ) + else: + content = await file.aread() + s3_key = self._generate_s3_key(file, content) + + logger.info( + f"Uploading file '{file.filename}' to S3 bucket " + f"'{self.bucket_name}' ({len(content)} bytes)" + ) + + async with session.client("s3", region_name=self._region) as client: + await client.upload_fileobj( + io.BytesIO(content), + self.bucket_name, + s3_key, + ExtraArgs={"ContentType": file.content_type}, + Config=transfer_config, + ) + s3_uri = self._build_s3_uri(s3_key) logger.info(f"Uploaded to S3: {s3_uri}") diff --git a/lib/crewai/src/crewai/files/uploaders/gemini.py b/lib/crewai/src/crewai/files/uploaders/gemini.py index eb6d82215..e9fb3fa9c 100644 --- a/lib/crewai/src/crewai/files/uploaders/gemini.py +++ b/lib/crewai/src/crewai/files/uploaders/gemini.py @@ -7,6 +7,7 @@ from datetime import datetime, timedelta, timezone import io import logging import os +from pathlib import Path import random import time from typing import Any @@ -19,6 +20,7 @@ from crewai.files.content_types import ( TextFile, VideoFile, ) +from crewai.files.file import FilePath from crewai.files.uploaders.base import FileUploader, UploadResult @@ -29,6 +31,36 @@ FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile GEMINI_FILE_TTL = timedelta(hours=48) +def _get_file_path(file: FileInput) -> Path | None: + """Get the filesystem path if file source is FilePath. + + Args: + file: The file input to check. + + Returns: + Path if source is FilePath, None otherwise. + """ + source = file._file_source + if isinstance(source, FilePath): + return source.path + return None + + +def _get_file_size(file: FileInput) -> int | None: + """Get file size without reading content if possible. + + Args: + file: The file input. + + Returns: + Size in bytes if determinable without reading, None otherwise. + """ + source = file._file_source + if isinstance(source, FilePath): + return source.path.stat().st_size + return None + + class GeminiFileUploader(FileUploader): """Uploader for Google Gemini File API. @@ -70,6 +102,9 @@ class GeminiFileUploader(FileUploader): def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult: """Upload a file to Gemini. + For FilePath sources, passes the path directly to the SDK which handles + streaming internally via resumable uploads, avoiding memory overhead. + Args: file: The file to upload. purpose: Optional purpose/description (used as display name). @@ -88,24 +123,38 @@ class GeminiFileUploader(FileUploader): try: client = self._get_client() - - content = file.read() display_name = purpose or file.filename - file_data = io.BytesIO(content) - file_data.name = file.filename + file_path = _get_file_path(file) + if file_path is not None: + file_size = file_path.stat().st_size + logger.info( + f"Uploading file '{file.filename}' to Gemini via path " + f"({file_size} bytes, streaming)" + ) + uploaded_file = client.files.upload( + file=file_path, + config={ + "display_name": display_name, + "mime_type": file.content_type, + }, + ) + else: + content = file.read() + file_data = io.BytesIO(content) + file_data.name = file.filename - logger.info( - f"Uploading file '{file.filename}' to Gemini ({len(content)} bytes)" - ) + logger.info( + f"Uploading file '{file.filename}' to Gemini ({len(content)} bytes)" + ) - uploaded_file = client.files.upload( - file=file_data, - config={ - "display_name": display_name, - "mime_type": file.content_type, - }, - ) + uploaded_file = client.files.upload( + file=file_data, + config={ + "display_name": display_name, + "mime_type": file.content_type, + }, + ) if file.content_type.startswith("video/"): if not self.wait_for_processing(uploaded_file.name): @@ -174,7 +223,8 @@ class GeminiFileUploader(FileUploader): ) -> UploadResult: """Async upload a file to Gemini using native async client. - Uses async wait_for_processing for video files. + For FilePath sources, passes the path directly to the SDK which handles + streaming internally via resumable uploads, avoiding memory overhead. Args: file: The file to upload. @@ -194,24 +244,38 @@ class GeminiFileUploader(FileUploader): try: client = self._get_client() - - content = await file.aread() display_name = purpose or file.filename - file_data = io.BytesIO(content) - file_data.name = file.filename + file_path = _get_file_path(file) + if file_path is not None: + file_size = file_path.stat().st_size + logger.info( + f"Uploading file '{file.filename}' to Gemini via path " + f"({file_size} bytes, streaming)" + ) + uploaded_file = await client.aio.files.upload( + file=file_path, + config={ + "display_name": display_name, + "mime_type": file.content_type, + }, + ) + else: + content = await file.aread() + file_data = io.BytesIO(content) + file_data.name = file.filename - logger.info( - f"Uploading file '{file.filename}' to Gemini ({len(content)} bytes)" - ) + logger.info( + f"Uploading file '{file.filename}' to Gemini ({len(content)} bytes)" + ) - uploaded_file = await client.aio.files.upload( - file=file_data, - config={ - "display_name": display_name, - "mime_type": file.content_type, - }, - ) + uploaded_file = await client.aio.files.upload( + file=file_data, + config={ + "display_name": display_name, + "mime_type": file.content_type, + }, + ) if file.content_type.startswith("video/"): if not await self.await_for_processing(uploaded_file.name): diff --git a/lib/crewai/src/crewai/files/uploaders/openai.py b/lib/crewai/src/crewai/files/uploaders/openai.py index 1bc47cc6f..f3fcd5fe0 100644 --- a/lib/crewai/src/crewai/files/uploaders/openai.py +++ b/lib/crewai/src/crewai/files/uploaders/openai.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections.abc import AsyncIterator, Iterator import io import logging import os @@ -15,6 +16,7 @@ from crewai.files.content_types import ( TextFile, VideoFile, ) +from crewai.files.file import FileBytes, FilePath, FileStream from crewai.files.uploaders.base import FileUploader, UploadResult @@ -22,25 +24,102 @@ logger = logging.getLogger(__name__) FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile +FILES_API_MAX_SIZE = 512 * 1024 * 1024 +DEFAULT_UPLOAD_CHUNK_SIZE = 64 * 1024 * 1024 + + +def _get_file_size(file: FileInput) -> int | None: + """Get file size without reading content if possible. + + Args: + file: The file to get size for. + + Returns: + File size in bytes, or None if size cannot be determined without reading. + """ + source = file._file_source + if isinstance(source, FilePath): + return source.path.stat().st_size + if isinstance(source, FileBytes): + return len(source.data) + return None + + +def _iter_file_chunks(file: FileInput, chunk_size: int) -> Iterator[bytes]: + """Iterate over file content in chunks. + + Args: + file: The file to read. + chunk_size: Size of each chunk in bytes. + + Yields: + Chunks of file content. + """ + source = file._file_source + if isinstance(source, (FilePath, FileBytes, FileStream)): + yield from source.read_chunks(chunk_size) + else: + content = file.read() + for i in range(0, len(content), chunk_size): + yield content[i : i + chunk_size] + + +async def _aiter_file_chunks( + file: FileInput, chunk_size: int, content: bytes | None = None +) -> AsyncIterator[bytes]: + """Async iterate over file content in chunks. + + Args: + file: The file to read. + chunk_size: Size of each chunk in bytes. + content: Optional pre-loaded content to chunk. + + Yields: + Chunks of file content. + """ + if content is not None: + for i in range(0, len(content), chunk_size): + yield content[i : i + chunk_size] + return + + source = file._file_source + if isinstance(source, FilePath): + async for chunk in source.aread_chunks(chunk_size): + yield chunk + elif isinstance(source, (FileBytes, FileStream)): + for chunk in source.read_chunks(chunk_size): + yield chunk + else: + data = await file.aread() + for i in range(0, len(data), chunk_size): + yield data[i : i + chunk_size] + class OpenAIFileUploader(FileUploader): - """Uploader for OpenAI Files API. + """Uploader for OpenAI Files and Uploads APIs. - Uses the OpenAI SDK to upload files. Files are stored persistently - until explicitly deleted. + Uses the Files API for files up to 512MB (single request). + Uses the Uploads API for files larger than 512MB (multipart chunked). Attributes: api_key: Optional API key (uses OPENAI_API_KEY env var if not provided). + chunk_size: Chunk size for multipart uploads (default 64MB). """ - def __init__(self, api_key: str | None = None) -> None: + def __init__( + self, + api_key: str | None = None, + chunk_size: int = DEFAULT_UPLOAD_CHUNK_SIZE, + ) -> None: """Initialize the OpenAI uploader. Args: api_key: Optional OpenAI API key. If not provided, uses OPENAI_API_KEY environment variable. + chunk_size: Chunk size in bytes for multipart uploads (default 64MB). """ self._api_key = api_key or os.environ.get("OPENAI_API_KEY") + self._chunk_size = chunk_size self._client: Any = None self._async_client: Any = None @@ -80,6 +159,9 @@ class OpenAIFileUploader(FileUploader): def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult: """Upload a file to OpenAI. + Uses Files API for files <= 512MB, Uploads API for larger files. + For large files, streams chunks to avoid loading entire file in memory. + Args: file: The file to upload. purpose: Optional purpose for the file (default: "user_data"). @@ -97,65 +179,258 @@ class OpenAIFileUploader(FileUploader): ) try: - client = self._get_client() + file_size = _get_file_size(file) + + if file_size is not None and file_size > FILES_API_MAX_SIZE: + return self._upload_multipart_streaming(file, file_size, purpose) content = file.read() - file_purpose = purpose or "user_data" + if len(content) > FILES_API_MAX_SIZE: + return self._upload_multipart(file, content, purpose) + return self._upload_simple(file, content, purpose) + except ImportError: + raise + except (TransientUploadError, PermanentUploadError): + raise + except Exception as e: + raise self._classify_error(e, file.filename) from e - file_data = io.BytesIO(content) - file_data.name = file.filename or "file" + def _upload_simple( + self, + file: FileInput, + content: bytes, + purpose: str | None, + ) -> UploadResult: + """Upload using the Files API (single request, up to 512MB). - logger.info( - f"Uploading file '{file.filename}' to OpenAI ({len(content)} bytes)" + Args: + file: The file to upload. + content: File content bytes. + purpose: Optional purpose for the file. + + Returns: + UploadResult with the file ID and metadata. + """ + client = self._get_client() + file_purpose = purpose or "user_data" + + file_data = io.BytesIO(content) + file_data.name = file.filename or "file" + + logger.info( + f"Uploading file '{file.filename}' to OpenAI Files API ({len(content)} bytes)" + ) + + uploaded_file = client.files.create( + file=file_data, + purpose=file_purpose, + ) + + logger.info(f"Uploaded to OpenAI: {uploaded_file.id}") + + return UploadResult( + file_id=uploaded_file.id, + file_uri=None, + content_type=file.content_type, + expires_at=None, + provider=self.provider_name, + ) + + def _upload_multipart( + self, + file: FileInput, + content: bytes, + purpose: str | None, + ) -> UploadResult: + """Upload using the Uploads API with content already in memory. + + Args: + file: The file to upload. + content: File content bytes (already loaded). + purpose: Optional purpose for the file. + + Returns: + UploadResult with the file ID and metadata. + """ + client = self._get_client() + file_purpose = purpose or "user_data" + filename = file.filename or "file" + file_size = len(content) + + logger.info( + f"Uploading file '{filename}' to OpenAI Uploads API " + f"({file_size} bytes, {self._chunk_size} byte chunks)" + ) + + upload = client.uploads.create( + bytes=file_size, + filename=filename, + mime_type=file.content_type, + purpose=file_purpose, + ) + + part_ids: list[str] = [] + offset = 0 + part_num = 1 + + try: + while offset < file_size: + chunk = content[offset : offset + self._chunk_size] + chunk_io = io.BytesIO(chunk) + + logger.debug( + f"Uploading part {part_num} ({len(chunk)} bytes, offset {offset})" + ) + + part = client.uploads.parts.create( + upload_id=upload.id, + data=chunk_io, + ) + part_ids.append(part.id) + + offset += self._chunk_size + part_num += 1 + + completed = client.uploads.complete( + upload_id=upload.id, + part_ids=part_ids, ) - uploaded_file = client.files.create( - file=file_data, - purpose=file_purpose, - ) - - logger.info(f"Uploaded to OpenAI: {uploaded_file.id}") + file_id = completed.file.id if completed.file else upload.id + logger.info(f"Completed multipart upload to OpenAI: {file_id}") return UploadResult( - file_id=uploaded_file.id, + file_id=file_id, file_uri=None, content_type=file.content_type, expires_at=None, provider=self.provider_name, ) - except ImportError: + except Exception: + logger.warning(f"Multipart upload failed, cancelling upload {upload.id}") + try: + client.uploads.cancel(upload_id=upload.id) + except Exception as cancel_err: + logger.debug(f"Failed to cancel upload: {cancel_err}") raise - except Exception as e: - error_type = type(e).__name__ - if "RateLimit" in error_type or "APIConnection" in error_type: - raise TransientUploadError( - f"Transient upload error: {e}", file_name=file.filename - ) from e - if "Authentication" in error_type or "Permission" in error_type: - raise PermanentUploadError( - f"Authentication/permission error: {e}", file_name=file.filename - ) from e - if "BadRequest" in error_type or "InvalidRequest" in error_type: - raise PermanentUploadError( - f"Invalid request: {e}", file_name=file.filename - ) from e - status_code = getattr(e, "status_code", None) - if status_code is not None: - if status_code >= 500 or status_code == 429: - raise TransientUploadError( - f"Server error ({status_code}): {e}", file_name=file.filename - ) from e - if status_code in (401, 403): - raise PermanentUploadError( - f"Auth error ({status_code}): {e}", file_name=file.filename - ) from e - if status_code == 400: - raise PermanentUploadError( - f"Bad request ({status_code}): {e}", file_name=file.filename - ) from e - raise TransientUploadError( - f"Upload failed: {e}", file_name=file.filename - ) from e + + def _upload_multipart_streaming( + self, + file: FileInput, + file_size: int, + purpose: str | None, + ) -> UploadResult: + """Upload using the Uploads API with streaming chunks. + + Streams chunks directly from the file source without loading + the entire file into memory. Used for large files. + + Args: + file: The file to upload. + file_size: Total file size in bytes. + purpose: Optional purpose for the file. + + Returns: + UploadResult with the file ID and metadata. + """ + client = self._get_client() + file_purpose = purpose or "user_data" + filename = file.filename or "file" + + logger.info( + f"Uploading file '{filename}' to OpenAI Uploads API (streaming) " + f"({file_size} bytes, {self._chunk_size} byte chunks)" + ) + + upload = client.uploads.create( + bytes=file_size, + filename=filename, + mime_type=file.content_type, + purpose=file_purpose, + ) + + part_ids: list[str] = [] + part_num = 1 + + try: + for chunk in _iter_file_chunks(file, self._chunk_size): + chunk_io = io.BytesIO(chunk) + + logger.debug(f"Uploading part {part_num} ({len(chunk)} bytes)") + + part = client.uploads.parts.create( + upload_id=upload.id, + data=chunk_io, + ) + part_ids.append(part.id) + part_num += 1 + + completed = client.uploads.complete( + upload_id=upload.id, + part_ids=part_ids, + ) + + file_id = completed.file.id if completed.file else upload.id + logger.info(f"Completed streaming multipart upload to OpenAI: {file_id}") + + return UploadResult( + file_id=file_id, + file_uri=None, + content_type=file.content_type, + expires_at=None, + provider=self.provider_name, + ) + except Exception: + logger.warning(f"Multipart upload failed, cancelling upload {upload.id}") + try: + client.uploads.cancel(upload_id=upload.id) + except Exception as cancel_err: + logger.debug(f"Failed to cancel upload: {cancel_err}") + raise + + def _classify_error(self, e: Exception, filename: str | None) -> Exception: + """Classify an exception as transient or permanent. + + Args: + e: The exception to classify. + filename: The filename for error context. + + Returns: + TransientUploadError or PermanentUploadError. + """ + from crewai.files.processing.exceptions import ( + PermanentUploadError, + TransientUploadError, + ) + + error_type = type(e).__name__ + if "RateLimit" in error_type or "APIConnection" in error_type: + return TransientUploadError( + f"Transient upload error: {e}", file_name=filename + ) + if "Authentication" in error_type or "Permission" in error_type: + return PermanentUploadError( + f"Authentication/permission error: {e}", file_name=filename + ) + if "BadRequest" in error_type or "InvalidRequest" in error_type: + return PermanentUploadError(f"Invalid request: {e}", file_name=filename) + + status_code = getattr(e, "status_code", None) + if status_code is not None: + if status_code >= 500 or status_code == 429: + return TransientUploadError( + f"Server error ({status_code}): {e}", file_name=filename + ) + if status_code in (401, 403): + return PermanentUploadError( + f"Auth error ({status_code}): {e}", file_name=filename + ) + if status_code == 400: + return PermanentUploadError( + f"Bad request ({status_code}): {e}", file_name=filename + ) + + return TransientUploadError(f"Upload failed: {e}", file_name=filename) def delete(self, file_id: str) -> bool: """Delete an uploaded file from OpenAI. @@ -228,6 +503,9 @@ class OpenAIFileUploader(FileUploader): ) -> UploadResult: """Async upload a file to OpenAI using native async client. + Uses Files API for files <= 512MB, Uploads API for larger files. + For large files, streams chunks to avoid loading entire file in memory. + Args: file: The file to upload. purpose: Optional purpose for the file (default: "user_data"). @@ -245,65 +523,214 @@ class OpenAIFileUploader(FileUploader): ) try: - client = self._get_async_client() + file_size = _get_file_size(file) + + if file_size is not None and file_size > FILES_API_MAX_SIZE: + return await self._aupload_multipart_streaming(file, file_size, purpose) content = await file.aread() - file_purpose = purpose or "user_data" + if len(content) > FILES_API_MAX_SIZE: + return await self._aupload_multipart(file, content, purpose) + return await self._aupload_simple(file, content, purpose) + except ImportError: + raise + except (TransientUploadError, PermanentUploadError): + raise + except Exception as e: + raise self._classify_error(e, file.filename) from e - file_data = io.BytesIO(content) - file_data.name = file.filename or "file" + async def _aupload_simple( + self, + file: FileInput, + content: bytes, + purpose: str | None, + ) -> UploadResult: + """Async upload using the Files API (single request, up to 512MB). - logger.info( - f"Uploading file '{file.filename}' to OpenAI ({len(content)} bytes)" + Args: + file: The file to upload. + content: File content bytes. + purpose: Optional purpose for the file. + + Returns: + UploadResult with the file ID and metadata. + """ + client = self._get_async_client() + file_purpose = purpose or "user_data" + + file_data = io.BytesIO(content) + file_data.name = file.filename or "file" + + logger.info( + f"Uploading file '{file.filename}' to OpenAI Files API ({len(content)} bytes)" + ) + + uploaded_file = await client.files.create( + file=file_data, + purpose=file_purpose, + ) + + logger.info(f"Uploaded to OpenAI: {uploaded_file.id}") + + return UploadResult( + file_id=uploaded_file.id, + file_uri=None, + content_type=file.content_type, + expires_at=None, + provider=self.provider_name, + ) + + async def _aupload_multipart( + self, + file: FileInput, + content: bytes, + purpose: str | None, + ) -> UploadResult: + """Async upload using the Uploads API (multipart chunked, up to 8GB). + + Args: + file: The file to upload. + content: File content bytes. + purpose: Optional purpose for the file. + + Returns: + UploadResult with the file ID and metadata. + """ + client = self._get_async_client() + file_purpose = purpose or "user_data" + filename = file.filename or "file" + file_size = len(content) + + logger.info( + f"Uploading file '{filename}' to OpenAI Uploads API " + f"({file_size} bytes, {self._chunk_size} byte chunks)" + ) + + upload = await client.uploads.create( + bytes=file_size, + filename=filename, + mime_type=file.content_type, + purpose=file_purpose, + ) + + part_ids: list[str] = [] + offset = 0 + part_num = 1 + + try: + while offset < file_size: + chunk = content[offset : offset + self._chunk_size] + chunk_io = io.BytesIO(chunk) + + logger.debug( + f"Uploading part {part_num} ({len(chunk)} bytes, offset {offset})" + ) + + part = await client.uploads.parts.create( + upload_id=upload.id, + data=chunk_io, + ) + part_ids.append(part.id) + + offset += self._chunk_size + part_num += 1 + + completed = await client.uploads.complete( + upload_id=upload.id, + part_ids=part_ids, ) - uploaded_file = await client.files.create( - file=file_data, - purpose=file_purpose, - ) - - logger.info(f"Uploaded to OpenAI: {uploaded_file.id}") + file_id = completed.file.id if completed.file else upload.id + logger.info(f"Completed multipart upload to OpenAI: {file_id}") return UploadResult( - file_id=uploaded_file.id, + file_id=file_id, file_uri=None, content_type=file.content_type, expires_at=None, provider=self.provider_name, ) - except ImportError: + except Exception: + logger.warning(f"Multipart upload failed, cancelling upload {upload.id}") + try: + await client.uploads.cancel(upload_id=upload.id) + except Exception as cancel_err: + logger.debug(f"Failed to cancel upload: {cancel_err}") + raise + + async def _aupload_multipart_streaming( + self, + file: FileInput, + file_size: int, + purpose: str | None, + ) -> UploadResult: + """Async upload using the Uploads API with streaming chunks. + + Streams chunks directly from the file source without loading + the entire file into memory. Used for large files. + + Args: + file: The file to upload. + file_size: Total file size in bytes. + purpose: Optional purpose for the file. + + Returns: + UploadResult with the file ID and metadata. + """ + client = self._get_async_client() + file_purpose = purpose or "user_data" + filename = file.filename or "file" + + logger.info( + f"Uploading file '{filename}' to OpenAI Uploads API (streaming) " + f"({file_size} bytes, {self._chunk_size} byte chunks)" + ) + + upload = await client.uploads.create( + bytes=file_size, + filename=filename, + mime_type=file.content_type, + purpose=file_purpose, + ) + + part_ids: list[str] = [] + part_num = 1 + + try: + async for chunk in _aiter_file_chunks(file, self._chunk_size): + chunk_io = io.BytesIO(chunk) + + logger.debug(f"Uploading part {part_num} ({len(chunk)} bytes)") + + part = await client.uploads.parts.create( + upload_id=upload.id, + data=chunk_io, + ) + part_ids.append(part.id) + part_num += 1 + + completed = await client.uploads.complete( + upload_id=upload.id, + part_ids=part_ids, + ) + + file_id = completed.file.id if completed.file else upload.id + logger.info(f"Completed streaming multipart upload to OpenAI: {file_id}") + + return UploadResult( + file_id=file_id, + file_uri=None, + content_type=file.content_type, + expires_at=None, + provider=self.provider_name, + ) + except Exception: + logger.warning(f"Multipart upload failed, cancelling upload {upload.id}") + try: + await client.uploads.cancel(upload_id=upload.id) + except Exception as cancel_err: + logger.debug(f"Failed to cancel upload: {cancel_err}") raise - except Exception as e: - error_type = type(e).__name__ - if "RateLimit" in error_type or "APIConnection" in error_type: - raise TransientUploadError( - f"Transient upload error: {e}", file_name=file.filename - ) from e - if "Authentication" in error_type or "Permission" in error_type: - raise PermanentUploadError( - f"Authentication/permission error: {e}", file_name=file.filename - ) from e - if "BadRequest" in error_type or "InvalidRequest" in error_type: - raise PermanentUploadError( - f"Invalid request: {e}", file_name=file.filename - ) from e - status_code = getattr(e, "status_code", None) - if status_code is not None: - if status_code >= 500 or status_code == 429: - raise TransientUploadError( - f"Server error ({status_code}): {e}", file_name=file.filename - ) from e - if status_code in (401, 403): - raise PermanentUploadError( - f"Auth error ({status_code}): {e}", file_name=file.filename - ) from e - if status_code == 400: - raise PermanentUploadError( - f"Bad request ({status_code}): {e}", file_name=file.filename - ) from e - raise TransientUploadError( - f"Upload failed: {e}", file_name=file.filename - ) from e async def adelete(self, file_id: str) -> bool: """Async delete an uploaded file from OpenAI.