feat: add streaming uploads for large files

- OpenAI: Use Uploads API for files > 512MB with chunked streaming
- Gemini: Pass file path directly to SDK for FilePath sources
- Bedrock: Use upload_fileobj with TransferConfig for automatic multipart
This commit is contained in:
Greyson LaLonde
2026-01-22 02:10:15 -05:00
parent 5550c6df7e
commit 1353cb2a33
3 changed files with 773 additions and 151 deletions

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import hashlib import hashlib
import logging import logging
import os import os
from pathlib import Path
from typing import Any from typing import Any
from crewai.files.content_types import ( from crewai.files.content_types import (
@@ -15,6 +16,7 @@ from crewai.files.content_types import (
TextFile, TextFile,
VideoFile, VideoFile,
) )
from crewai.files.file import FileBytes, FilePath
from crewai.files.uploaders.base import FileUploader, UploadResult from crewai.files.uploaders.base import FileUploader, UploadResult
@@ -22,6 +24,58 @@ logger = logging.getLogger(__name__)
FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile
MULTIPART_THRESHOLD = 8 * 1024 * 1024
MULTIPART_CHUNKSIZE = 8 * 1024 * 1024
MAX_CONCURRENCY = 10
def _get_file_path(file: FileInput) -> Path | None:
"""Get the filesystem path if file source is FilePath.
Args:
file: The file input to check.
Returns:
Path if source is FilePath, None otherwise.
"""
source = file._file_source
if isinstance(source, FilePath):
return source.path
return None
def _get_file_size(file: FileInput) -> int | None:
"""Get file size without reading content if possible.
Args:
file: The file input.
Returns:
Size in bytes if determinable without reading, None otherwise.
"""
source = file._file_source
if isinstance(source, FilePath):
return source.path.stat().st_size
if isinstance(source, FileBytes):
return len(source.data)
return None
def _compute_hash_streaming(file_path: Path) -> str:
"""Compute SHA-256 hash by streaming file content.
Args:
file_path: Path to the file.
Returns:
First 16 characters of hex digest.
"""
hasher = hashlib.sha256()
with open(file_path, "rb") as f:
while chunk := f.read(1024 * 1024):
hasher.update(chunk)
return hasher.hexdigest()[:16]
class BedrockFileUploader(FileUploader): class BedrockFileUploader(FileUploader):
"""Uploader for AWS Bedrock via S3. """Uploader for AWS Bedrock via S3.
@@ -112,19 +166,28 @@ class BedrockFileUploader(FileUploader):
) from e ) from e
return self._session return self._session
def _generate_s3_key(self, file: FileInput, content: bytes) -> str: def _generate_s3_key(self, file: FileInput, content: bytes | None = None) -> str:
"""Generate a unique S3 key for the file. """Generate a unique S3 key for the file.
For FilePath sources with no content provided, computes hash via streaming.
Args: Args:
file: The file being uploaded. file: The file being uploaded.
content: The file content bytes. content: The file content bytes (optional for FilePath sources).
Returns: Returns:
S3 key string. S3 key string.
""" """
content_hash = hashlib.sha256(content).hexdigest()[:16] if content is not None:
filename = file.filename or "file" content_hash = hashlib.sha256(content).hexdigest()[:16]
else:
file_path = _get_file_path(file)
if file_path is not None:
content_hash = _compute_hash_streaming(file_path)
else:
content_hash = hashlib.sha256(file.read()).hexdigest()[:16]
filename = file.filename or "file"
safe_filename = "".join( safe_filename = "".join(
c if c.isalnum() or c in ".-_" else "_" for c in filename c if c.isalnum() or c in ".-_" else "_" for c in filename
) )
@@ -141,9 +204,23 @@ class BedrockFileUploader(FileUploader):
""" """
return f"s3://{self.bucket_name}/{key}" return f"s3://{self.bucket_name}/{key}"
def _get_transfer_config(self) -> Any:
"""Get boto3 TransferConfig for multipart uploads."""
from boto3.s3.transfer import TransferConfig
return TransferConfig(
multipart_threshold=MULTIPART_THRESHOLD,
multipart_chunksize=MULTIPART_CHUNKSIZE,
max_concurrency=MAX_CONCURRENCY,
use_threads=True,
)
def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult: def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult:
"""Upload a file to S3 for use with Bedrock. """Upload a file to S3 for use with Bedrock.
Uses streaming upload with automatic multipart for large files.
For FilePath sources, streams directly from disk without loading into memory.
Args: Args:
file: The file to upload. file: The file to upload.
purpose: Optional purpose (unused, kept for interface consistency). purpose: Optional purpose (unused, kept for interface consistency).
@@ -155,6 +232,8 @@ class BedrockFileUploader(FileUploader):
TransientUploadError: For retryable errors (network, throttling). TransientUploadError: For retryable errors (network, throttling).
PermanentUploadError: For non-retryable errors (auth, validation). PermanentUploadError: For non-retryable errors (auth, validation).
""" """
import io
from crewai.files.processing.exceptions import ( from crewai.files.processing.exceptions import (
PermanentUploadError, PermanentUploadError,
TransientUploadError, TransientUploadError,
@@ -162,20 +241,42 @@ class BedrockFileUploader(FileUploader):
try: try:
client = self._get_client() client = self._get_client()
content = file.read() transfer_config = self._get_transfer_config()
s3_key = self._generate_s3_key(file, content) file_path = _get_file_path(file)
logger.info( if file_path is not None:
f"Uploading file '{file.filename}' to S3 bucket " file_size = file_path.stat().st_size
f"'{self.bucket_name}' ({len(content)} bytes)" s3_key = self._generate_s3_key(file)
)
client.put_object( logger.info(
Bucket=self.bucket_name, f"Uploading file '{file.filename}' to S3 bucket "
Key=s3_key, f"'{self.bucket_name}' ({file_size} bytes, streaming)"
Body=content, )
ContentType=file.content_type,
) with open(file_path, "rb") as f:
client.upload_fileobj(
f,
self.bucket_name,
s3_key,
ExtraArgs={"ContentType": file.content_type},
Config=transfer_config,
)
else:
content = file.read()
s3_key = self._generate_s3_key(file, content)
logger.info(
f"Uploading file '{file.filename}' to S3 bucket "
f"'{self.bucket_name}' ({len(content)} bytes)"
)
client.upload_fileobj(
io.BytesIO(content),
self.bucket_name,
s3_key,
ExtraArgs={"ContentType": file.content_type},
Config=transfer_config,
)
s3_uri = self._build_s3_uri(s3_key) s3_uri = self._build_s3_uri(s3_key)
logger.info(f"Uploaded to S3: {s3_uri}") logger.info(f"Uploaded to S3: {s3_uri}")
@@ -292,6 +393,9 @@ class BedrockFileUploader(FileUploader):
) -> UploadResult: ) -> UploadResult:
"""Async upload a file to S3 for use with Bedrock. """Async upload a file to S3 for use with Bedrock.
Uses streaming upload with automatic multipart for large files.
For FilePath sources, streams directly from disk without loading into memory.
Args: Args:
file: The file to upload. file: The file to upload.
purpose: Optional purpose (unused, kept for interface consistency). purpose: Optional purpose (unused, kept for interface consistency).
@@ -303,6 +407,10 @@ class BedrockFileUploader(FileUploader):
TransientUploadError: For retryable errors (network, throttling). TransientUploadError: For retryable errors (network, throttling).
PermanentUploadError: For non-retryable errors (auth, validation). PermanentUploadError: For non-retryable errors (auth, validation).
""" """
import io
import aiofiles
from crewai.files.processing.exceptions import ( from crewai.files.processing.exceptions import (
PermanentUploadError, PermanentUploadError,
TransientUploadError, TransientUploadError,
@@ -310,22 +418,45 @@ class BedrockFileUploader(FileUploader):
try: try:
session = self._get_async_client() session = self._get_async_client()
content = await file.aread() transfer_config = self._get_transfer_config()
s3_key = self._generate_s3_key(file, content) file_path = _get_file_path(file)
logger.info( if file_path is not None:
f"Uploading file '{file.filename}' to S3 bucket " file_size = file_path.stat().st_size
f"'{self.bucket_name}' ({len(content)} bytes)" s3_key = self._generate_s3_key(file)
)
async with session.client("s3", region_name=self._region) as client: logger.info(
await client.put_object( f"Uploading file '{file.filename}' to S3 bucket "
Bucket=self.bucket_name, f"'{self.bucket_name}' ({file_size} bytes, streaming)"
Key=s3_key,
Body=content,
ContentType=file.content_type,
) )
async with session.client("s3", region_name=self._region) as client:
async with aiofiles.open(file_path, "rb") as f:
await client.upload_fileobj(
f,
self.bucket_name,
s3_key,
ExtraArgs={"ContentType": file.content_type},
Config=transfer_config,
)
else:
content = await file.aread()
s3_key = self._generate_s3_key(file, content)
logger.info(
f"Uploading file '{file.filename}' to S3 bucket "
f"'{self.bucket_name}' ({len(content)} bytes)"
)
async with session.client("s3", region_name=self._region) as client:
await client.upload_fileobj(
io.BytesIO(content),
self.bucket_name,
s3_key,
ExtraArgs={"ContentType": file.content_type},
Config=transfer_config,
)
s3_uri = self._build_s3_uri(s3_key) s3_uri = self._build_s3_uri(s3_key)
logger.info(f"Uploaded to S3: {s3_uri}") logger.info(f"Uploaded to S3: {s3_uri}")

View File

@@ -7,6 +7,7 @@ from datetime import datetime, timedelta, timezone
import io import io
import logging import logging
import os import os
from pathlib import Path
import random import random
import time import time
from typing import Any from typing import Any
@@ -19,6 +20,7 @@ from crewai.files.content_types import (
TextFile, TextFile,
VideoFile, VideoFile,
) )
from crewai.files.file import FilePath
from crewai.files.uploaders.base import FileUploader, UploadResult from crewai.files.uploaders.base import FileUploader, UploadResult
@@ -29,6 +31,36 @@ FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile
GEMINI_FILE_TTL = timedelta(hours=48) GEMINI_FILE_TTL = timedelta(hours=48)
def _get_file_path(file: FileInput) -> Path | None:
"""Get the filesystem path if file source is FilePath.
Args:
file: The file input to check.
Returns:
Path if source is FilePath, None otherwise.
"""
source = file._file_source
if isinstance(source, FilePath):
return source.path
return None
def _get_file_size(file: FileInput) -> int | None:
"""Get file size without reading content if possible.
Args:
file: The file input.
Returns:
Size in bytes if determinable without reading, None otherwise.
"""
source = file._file_source
if isinstance(source, FilePath):
return source.path.stat().st_size
return None
class GeminiFileUploader(FileUploader): class GeminiFileUploader(FileUploader):
"""Uploader for Google Gemini File API. """Uploader for Google Gemini File API.
@@ -70,6 +102,9 @@ class GeminiFileUploader(FileUploader):
def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult: def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult:
"""Upload a file to Gemini. """Upload a file to Gemini.
For FilePath sources, passes the path directly to the SDK which handles
streaming internally via resumable uploads, avoiding memory overhead.
Args: Args:
file: The file to upload. file: The file to upload.
purpose: Optional purpose/description (used as display name). purpose: Optional purpose/description (used as display name).
@@ -88,24 +123,38 @@ class GeminiFileUploader(FileUploader):
try: try:
client = self._get_client() client = self._get_client()
content = file.read()
display_name = purpose or file.filename display_name = purpose or file.filename
file_data = io.BytesIO(content) file_path = _get_file_path(file)
file_data.name = file.filename if file_path is not None:
file_size = file_path.stat().st_size
logger.info(
f"Uploading file '{file.filename}' to Gemini via path "
f"({file_size} bytes, streaming)"
)
uploaded_file = client.files.upload(
file=file_path,
config={
"display_name": display_name,
"mime_type": file.content_type,
},
)
else:
content = file.read()
file_data = io.BytesIO(content)
file_data.name = file.filename
logger.info( logger.info(
f"Uploading file '{file.filename}' to Gemini ({len(content)} bytes)" f"Uploading file '{file.filename}' to Gemini ({len(content)} bytes)"
) )
uploaded_file = client.files.upload( uploaded_file = client.files.upload(
file=file_data, file=file_data,
config={ config={
"display_name": display_name, "display_name": display_name,
"mime_type": file.content_type, "mime_type": file.content_type,
}, },
) )
if file.content_type.startswith("video/"): if file.content_type.startswith("video/"):
if not self.wait_for_processing(uploaded_file.name): if not self.wait_for_processing(uploaded_file.name):
@@ -174,7 +223,8 @@ class GeminiFileUploader(FileUploader):
) -> UploadResult: ) -> UploadResult:
"""Async upload a file to Gemini using native async client. """Async upload a file to Gemini using native async client.
Uses async wait_for_processing for video files. For FilePath sources, passes the path directly to the SDK which handles
streaming internally via resumable uploads, avoiding memory overhead.
Args: Args:
file: The file to upload. file: The file to upload.
@@ -194,24 +244,38 @@ class GeminiFileUploader(FileUploader):
try: try:
client = self._get_client() client = self._get_client()
content = await file.aread()
display_name = purpose or file.filename display_name = purpose or file.filename
file_data = io.BytesIO(content) file_path = _get_file_path(file)
file_data.name = file.filename if file_path is not None:
file_size = file_path.stat().st_size
logger.info(
f"Uploading file '{file.filename}' to Gemini via path "
f"({file_size} bytes, streaming)"
)
uploaded_file = await client.aio.files.upload(
file=file_path,
config={
"display_name": display_name,
"mime_type": file.content_type,
},
)
else:
content = await file.aread()
file_data = io.BytesIO(content)
file_data.name = file.filename
logger.info( logger.info(
f"Uploading file '{file.filename}' to Gemini ({len(content)} bytes)" f"Uploading file '{file.filename}' to Gemini ({len(content)} bytes)"
) )
uploaded_file = await client.aio.files.upload( uploaded_file = await client.aio.files.upload(
file=file_data, file=file_data,
config={ config={
"display_name": display_name, "display_name": display_name,
"mime_type": file.content_type, "mime_type": file.content_type,
}, },
) )
if file.content_type.startswith("video/"): if file.content_type.startswith("video/"):
if not await self.await_for_processing(uploaded_file.name): if not await self.await_for_processing(uploaded_file.name):

View File

@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import AsyncIterator, Iterator
import io import io
import logging import logging
import os import os
@@ -15,6 +16,7 @@ from crewai.files.content_types import (
TextFile, TextFile,
VideoFile, VideoFile,
) )
from crewai.files.file import FileBytes, FilePath, FileStream
from crewai.files.uploaders.base import FileUploader, UploadResult from crewai.files.uploaders.base import FileUploader, UploadResult
@@ -22,25 +24,102 @@ logger = logging.getLogger(__name__)
FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile FileInput = AudioFile | File | ImageFile | PDFFile | TextFile | VideoFile
FILES_API_MAX_SIZE = 512 * 1024 * 1024
DEFAULT_UPLOAD_CHUNK_SIZE = 64 * 1024 * 1024
def _get_file_size(file: FileInput) -> int | None:
"""Get file size without reading content if possible.
Args:
file: The file to get size for.
Returns:
File size in bytes, or None if size cannot be determined without reading.
"""
source = file._file_source
if isinstance(source, FilePath):
return source.path.stat().st_size
if isinstance(source, FileBytes):
return len(source.data)
return None
def _iter_file_chunks(file: FileInput, chunk_size: int) -> Iterator[bytes]:
"""Iterate over file content in chunks.
Args:
file: The file to read.
chunk_size: Size of each chunk in bytes.
Yields:
Chunks of file content.
"""
source = file._file_source
if isinstance(source, (FilePath, FileBytes, FileStream)):
yield from source.read_chunks(chunk_size)
else:
content = file.read()
for i in range(0, len(content), chunk_size):
yield content[i : i + chunk_size]
async def _aiter_file_chunks(
file: FileInput, chunk_size: int, content: bytes | None = None
) -> AsyncIterator[bytes]:
"""Async iterate over file content in chunks.
Args:
file: The file to read.
chunk_size: Size of each chunk in bytes.
content: Optional pre-loaded content to chunk.
Yields:
Chunks of file content.
"""
if content is not None:
for i in range(0, len(content), chunk_size):
yield content[i : i + chunk_size]
return
source = file._file_source
if isinstance(source, FilePath):
async for chunk in source.aread_chunks(chunk_size):
yield chunk
elif isinstance(source, (FileBytes, FileStream)):
for chunk in source.read_chunks(chunk_size):
yield chunk
else:
data = await file.aread()
for i in range(0, len(data), chunk_size):
yield data[i : i + chunk_size]
class OpenAIFileUploader(FileUploader): class OpenAIFileUploader(FileUploader):
"""Uploader for OpenAI Files API. """Uploader for OpenAI Files and Uploads APIs.
Uses the OpenAI SDK to upload files. Files are stored persistently Uses the Files API for files up to 512MB (single request).
until explicitly deleted. Uses the Uploads API for files larger than 512MB (multipart chunked).
Attributes: Attributes:
api_key: Optional API key (uses OPENAI_API_KEY env var if not provided). api_key: Optional API key (uses OPENAI_API_KEY env var if not provided).
chunk_size: Chunk size for multipart uploads (default 64MB).
""" """
def __init__(self, api_key: str | None = None) -> None: def __init__(
self,
api_key: str | None = None,
chunk_size: int = DEFAULT_UPLOAD_CHUNK_SIZE,
) -> None:
"""Initialize the OpenAI uploader. """Initialize the OpenAI uploader.
Args: Args:
api_key: Optional OpenAI API key. If not provided, uses api_key: Optional OpenAI API key. If not provided, uses
OPENAI_API_KEY environment variable. OPENAI_API_KEY environment variable.
chunk_size: Chunk size in bytes for multipart uploads (default 64MB).
""" """
self._api_key = api_key or os.environ.get("OPENAI_API_KEY") self._api_key = api_key or os.environ.get("OPENAI_API_KEY")
self._chunk_size = chunk_size
self._client: Any = None self._client: Any = None
self._async_client: Any = None self._async_client: Any = None
@@ -80,6 +159,9 @@ class OpenAIFileUploader(FileUploader):
def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult: def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult:
"""Upload a file to OpenAI. """Upload a file to OpenAI.
Uses Files API for files <= 512MB, Uploads API for larger files.
For large files, streams chunks to avoid loading entire file in memory.
Args: Args:
file: The file to upload. file: The file to upload.
purpose: Optional purpose for the file (default: "user_data"). purpose: Optional purpose for the file (default: "user_data").
@@ -97,65 +179,258 @@ class OpenAIFileUploader(FileUploader):
) )
try: try:
client = self._get_client() file_size = _get_file_size(file)
if file_size is not None and file_size > FILES_API_MAX_SIZE:
return self._upload_multipart_streaming(file, file_size, purpose)
content = file.read() content = file.read()
file_purpose = purpose or "user_data" if len(content) > FILES_API_MAX_SIZE:
return self._upload_multipart(file, content, purpose)
return self._upload_simple(file, content, purpose)
except ImportError:
raise
except (TransientUploadError, PermanentUploadError):
raise
except Exception as e:
raise self._classify_error(e, file.filename) from e
file_data = io.BytesIO(content) def _upload_simple(
file_data.name = file.filename or "file" self,
file: FileInput,
content: bytes,
purpose: str | None,
) -> UploadResult:
"""Upload using the Files API (single request, up to 512MB).
logger.info( Args:
f"Uploading file '{file.filename}' to OpenAI ({len(content)} bytes)" file: The file to upload.
content: File content bytes.
purpose: Optional purpose for the file.
Returns:
UploadResult with the file ID and metadata.
"""
client = self._get_client()
file_purpose = purpose or "user_data"
file_data = io.BytesIO(content)
file_data.name = file.filename or "file"
logger.info(
f"Uploading file '{file.filename}' to OpenAI Files API ({len(content)} bytes)"
)
uploaded_file = client.files.create(
file=file_data,
purpose=file_purpose,
)
logger.info(f"Uploaded to OpenAI: {uploaded_file.id}")
return UploadResult(
file_id=uploaded_file.id,
file_uri=None,
content_type=file.content_type,
expires_at=None,
provider=self.provider_name,
)
def _upload_multipart(
self,
file: FileInput,
content: bytes,
purpose: str | None,
) -> UploadResult:
"""Upload using the Uploads API with content already in memory.
Args:
file: The file to upload.
content: File content bytes (already loaded).
purpose: Optional purpose for the file.
Returns:
UploadResult with the file ID and metadata.
"""
client = self._get_client()
file_purpose = purpose or "user_data"
filename = file.filename or "file"
file_size = len(content)
logger.info(
f"Uploading file '{filename}' to OpenAI Uploads API "
f"({file_size} bytes, {self._chunk_size} byte chunks)"
)
upload = client.uploads.create(
bytes=file_size,
filename=filename,
mime_type=file.content_type,
purpose=file_purpose,
)
part_ids: list[str] = []
offset = 0
part_num = 1
try:
while offset < file_size:
chunk = content[offset : offset + self._chunk_size]
chunk_io = io.BytesIO(chunk)
logger.debug(
f"Uploading part {part_num} ({len(chunk)} bytes, offset {offset})"
)
part = client.uploads.parts.create(
upload_id=upload.id,
data=chunk_io,
)
part_ids.append(part.id)
offset += self._chunk_size
part_num += 1
completed = client.uploads.complete(
upload_id=upload.id,
part_ids=part_ids,
) )
uploaded_file = client.files.create( file_id = completed.file.id if completed.file else upload.id
file=file_data, logger.info(f"Completed multipart upload to OpenAI: {file_id}")
purpose=file_purpose,
)
logger.info(f"Uploaded to OpenAI: {uploaded_file.id}")
return UploadResult( return UploadResult(
file_id=uploaded_file.id, file_id=file_id,
file_uri=None, file_uri=None,
content_type=file.content_type, content_type=file.content_type,
expires_at=None, expires_at=None,
provider=self.provider_name, provider=self.provider_name,
) )
except ImportError: except Exception:
logger.warning(f"Multipart upload failed, cancelling upload {upload.id}")
try:
client.uploads.cancel(upload_id=upload.id)
except Exception as cancel_err:
logger.debug(f"Failed to cancel upload: {cancel_err}")
raise raise
except Exception as e:
error_type = type(e).__name__ def _upload_multipart_streaming(
if "RateLimit" in error_type or "APIConnection" in error_type: self,
raise TransientUploadError( file: FileInput,
f"Transient upload error: {e}", file_name=file.filename file_size: int,
) from e purpose: str | None,
if "Authentication" in error_type or "Permission" in error_type: ) -> UploadResult:
raise PermanentUploadError( """Upload using the Uploads API with streaming chunks.
f"Authentication/permission error: {e}", file_name=file.filename
) from e Streams chunks directly from the file source without loading
if "BadRequest" in error_type or "InvalidRequest" in error_type: the entire file into memory. Used for large files.
raise PermanentUploadError(
f"Invalid request: {e}", file_name=file.filename Args:
) from e file: The file to upload.
status_code = getattr(e, "status_code", None) file_size: Total file size in bytes.
if status_code is not None: purpose: Optional purpose for the file.
if status_code >= 500 or status_code == 429:
raise TransientUploadError( Returns:
f"Server error ({status_code}): {e}", file_name=file.filename UploadResult with the file ID and metadata.
) from e """
if status_code in (401, 403): client = self._get_client()
raise PermanentUploadError( file_purpose = purpose or "user_data"
f"Auth error ({status_code}): {e}", file_name=file.filename filename = file.filename or "file"
) from e
if status_code == 400: logger.info(
raise PermanentUploadError( f"Uploading file '{filename}' to OpenAI Uploads API (streaming) "
f"Bad request ({status_code}): {e}", file_name=file.filename f"({file_size} bytes, {self._chunk_size} byte chunks)"
) from e )
raise TransientUploadError(
f"Upload failed: {e}", file_name=file.filename upload = client.uploads.create(
) from e bytes=file_size,
filename=filename,
mime_type=file.content_type,
purpose=file_purpose,
)
part_ids: list[str] = []
part_num = 1
try:
for chunk in _iter_file_chunks(file, self._chunk_size):
chunk_io = io.BytesIO(chunk)
logger.debug(f"Uploading part {part_num} ({len(chunk)} bytes)")
part = client.uploads.parts.create(
upload_id=upload.id,
data=chunk_io,
)
part_ids.append(part.id)
part_num += 1
completed = client.uploads.complete(
upload_id=upload.id,
part_ids=part_ids,
)
file_id = completed.file.id if completed.file else upload.id
logger.info(f"Completed streaming multipart upload to OpenAI: {file_id}")
return UploadResult(
file_id=file_id,
file_uri=None,
content_type=file.content_type,
expires_at=None,
provider=self.provider_name,
)
except Exception:
logger.warning(f"Multipart upload failed, cancelling upload {upload.id}")
try:
client.uploads.cancel(upload_id=upload.id)
except Exception as cancel_err:
logger.debug(f"Failed to cancel upload: {cancel_err}")
raise
def _classify_error(self, e: Exception, filename: str | None) -> Exception:
"""Classify an exception as transient or permanent.
Args:
e: The exception to classify.
filename: The filename for error context.
Returns:
TransientUploadError or PermanentUploadError.
"""
from crewai.files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
error_type = type(e).__name__
if "RateLimit" in error_type or "APIConnection" in error_type:
return TransientUploadError(
f"Transient upload error: {e}", file_name=filename
)
if "Authentication" in error_type or "Permission" in error_type:
return PermanentUploadError(
f"Authentication/permission error: {e}", file_name=filename
)
if "BadRequest" in error_type or "InvalidRequest" in error_type:
return PermanentUploadError(f"Invalid request: {e}", file_name=filename)
status_code = getattr(e, "status_code", None)
if status_code is not None:
if status_code >= 500 or status_code == 429:
return TransientUploadError(
f"Server error ({status_code}): {e}", file_name=filename
)
if status_code in (401, 403):
return PermanentUploadError(
f"Auth error ({status_code}): {e}", file_name=filename
)
if status_code == 400:
return PermanentUploadError(
f"Bad request ({status_code}): {e}", file_name=filename
)
return TransientUploadError(f"Upload failed: {e}", file_name=filename)
def delete(self, file_id: str) -> bool: def delete(self, file_id: str) -> bool:
"""Delete an uploaded file from OpenAI. """Delete an uploaded file from OpenAI.
@@ -228,6 +503,9 @@ class OpenAIFileUploader(FileUploader):
) -> UploadResult: ) -> UploadResult:
"""Async upload a file to OpenAI using native async client. """Async upload a file to OpenAI using native async client.
Uses Files API for files <= 512MB, Uploads API for larger files.
For large files, streams chunks to avoid loading entire file in memory.
Args: Args:
file: The file to upload. file: The file to upload.
purpose: Optional purpose for the file (default: "user_data"). purpose: Optional purpose for the file (default: "user_data").
@@ -245,65 +523,214 @@ class OpenAIFileUploader(FileUploader):
) )
try: try:
client = self._get_async_client() file_size = _get_file_size(file)
if file_size is not None and file_size > FILES_API_MAX_SIZE:
return await self._aupload_multipart_streaming(file, file_size, purpose)
content = await file.aread() content = await file.aread()
file_purpose = purpose or "user_data" if len(content) > FILES_API_MAX_SIZE:
return await self._aupload_multipart(file, content, purpose)
return await self._aupload_simple(file, content, purpose)
except ImportError:
raise
except (TransientUploadError, PermanentUploadError):
raise
except Exception as e:
raise self._classify_error(e, file.filename) from e
file_data = io.BytesIO(content) async def _aupload_simple(
file_data.name = file.filename or "file" self,
file: FileInput,
content: bytes,
purpose: str | None,
) -> UploadResult:
"""Async upload using the Files API (single request, up to 512MB).
logger.info( Args:
f"Uploading file '{file.filename}' to OpenAI ({len(content)} bytes)" file: The file to upload.
content: File content bytes.
purpose: Optional purpose for the file.
Returns:
UploadResult with the file ID and metadata.
"""
client = self._get_async_client()
file_purpose = purpose or "user_data"
file_data = io.BytesIO(content)
file_data.name = file.filename or "file"
logger.info(
f"Uploading file '{file.filename}' to OpenAI Files API ({len(content)} bytes)"
)
uploaded_file = await client.files.create(
file=file_data,
purpose=file_purpose,
)
logger.info(f"Uploaded to OpenAI: {uploaded_file.id}")
return UploadResult(
file_id=uploaded_file.id,
file_uri=None,
content_type=file.content_type,
expires_at=None,
provider=self.provider_name,
)
async def _aupload_multipart(
self,
file: FileInput,
content: bytes,
purpose: str | None,
) -> UploadResult:
"""Async upload using the Uploads API (multipart chunked, up to 8GB).
Args:
file: The file to upload.
content: File content bytes.
purpose: Optional purpose for the file.
Returns:
UploadResult with the file ID and metadata.
"""
client = self._get_async_client()
file_purpose = purpose or "user_data"
filename = file.filename or "file"
file_size = len(content)
logger.info(
f"Uploading file '{filename}' to OpenAI Uploads API "
f"({file_size} bytes, {self._chunk_size} byte chunks)"
)
upload = await client.uploads.create(
bytes=file_size,
filename=filename,
mime_type=file.content_type,
purpose=file_purpose,
)
part_ids: list[str] = []
offset = 0
part_num = 1
try:
while offset < file_size:
chunk = content[offset : offset + self._chunk_size]
chunk_io = io.BytesIO(chunk)
logger.debug(
f"Uploading part {part_num} ({len(chunk)} bytes, offset {offset})"
)
part = await client.uploads.parts.create(
upload_id=upload.id,
data=chunk_io,
)
part_ids.append(part.id)
offset += self._chunk_size
part_num += 1
completed = await client.uploads.complete(
upload_id=upload.id,
part_ids=part_ids,
) )
uploaded_file = await client.files.create( file_id = completed.file.id if completed.file else upload.id
file=file_data, logger.info(f"Completed multipart upload to OpenAI: {file_id}")
purpose=file_purpose,
)
logger.info(f"Uploaded to OpenAI: {uploaded_file.id}")
return UploadResult( return UploadResult(
file_id=uploaded_file.id, file_id=file_id,
file_uri=None, file_uri=None,
content_type=file.content_type, content_type=file.content_type,
expires_at=None, expires_at=None,
provider=self.provider_name, provider=self.provider_name,
) )
except ImportError: except Exception:
logger.warning(f"Multipart upload failed, cancelling upload {upload.id}")
try:
await client.uploads.cancel(upload_id=upload.id)
except Exception as cancel_err:
logger.debug(f"Failed to cancel upload: {cancel_err}")
raise
async def _aupload_multipart_streaming(
self,
file: FileInput,
file_size: int,
purpose: str | None,
) -> UploadResult:
"""Async upload using the Uploads API with streaming chunks.
Streams chunks directly from the file source without loading
the entire file into memory. Used for large files.
Args:
file: The file to upload.
file_size: Total file size in bytes.
purpose: Optional purpose for the file.
Returns:
UploadResult with the file ID and metadata.
"""
client = self._get_async_client()
file_purpose = purpose or "user_data"
filename = file.filename or "file"
logger.info(
f"Uploading file '{filename}' to OpenAI Uploads API (streaming) "
f"({file_size} bytes, {self._chunk_size} byte chunks)"
)
upload = await client.uploads.create(
bytes=file_size,
filename=filename,
mime_type=file.content_type,
purpose=file_purpose,
)
part_ids: list[str] = []
part_num = 1
try:
async for chunk in _aiter_file_chunks(file, self._chunk_size):
chunk_io = io.BytesIO(chunk)
logger.debug(f"Uploading part {part_num} ({len(chunk)} bytes)")
part = await client.uploads.parts.create(
upload_id=upload.id,
data=chunk_io,
)
part_ids.append(part.id)
part_num += 1
completed = await client.uploads.complete(
upload_id=upload.id,
part_ids=part_ids,
)
file_id = completed.file.id if completed.file else upload.id
logger.info(f"Completed streaming multipart upload to OpenAI: {file_id}")
return UploadResult(
file_id=file_id,
file_uri=None,
content_type=file.content_type,
expires_at=None,
provider=self.provider_name,
)
except Exception:
logger.warning(f"Multipart upload failed, cancelling upload {upload.id}")
try:
await client.uploads.cancel(upload_id=upload.id)
except Exception as cancel_err:
logger.debug(f"Failed to cancel upload: {cancel_err}")
raise raise
except Exception as e:
error_type = type(e).__name__
if "RateLimit" in error_type or "APIConnection" in error_type:
raise TransientUploadError(
f"Transient upload error: {e}", file_name=file.filename
) from e
if "Authentication" in error_type or "Permission" in error_type:
raise PermanentUploadError(
f"Authentication/permission error: {e}", file_name=file.filename
) from e
if "BadRequest" in error_type or "InvalidRequest" in error_type:
raise PermanentUploadError(
f"Invalid request: {e}", file_name=file.filename
) from e
status_code = getattr(e, "status_code", None)
if status_code is not None:
if status_code >= 500 or status_code == 429:
raise TransientUploadError(
f"Server error ({status_code}): {e}", file_name=file.filename
) from e
if status_code in (401, 403):
raise PermanentUploadError(
f"Auth error ({status_code}): {e}", file_name=file.filename
) from e
if status_code == 400:
raise PermanentUploadError(
f"Bad request ({status_code}): {e}", file_name=file.filename
) from e
raise TransientUploadError(
f"Upload failed: {e}", file_name=file.filename
) from e
async def adelete(self, file_id: str) -> bool: async def adelete(self, file_id: str) -> bool:
"""Async delete an uploaded file from OpenAI. """Async delete an uploaded file from OpenAI.