feat: add provider file uploaders

This commit is contained in:
Greyson LaLonde
2026-01-21 18:38:04 -05:00
parent 5200ed4372
commit 56946d309b
5 changed files with 713 additions and 0 deletions

View File

@@ -0,0 +1,64 @@
"""File uploader implementations for provider File APIs."""
from __future__ import annotations
import logging
from typing import Any
from crewai.utilities.files.uploaders.base import FileUploader, UploadResult
logger = logging.getLogger(__name__)
__all__ = [
"FileUploader",
"UploadResult",
"get_uploader",
]
def get_uploader(provider: str, **kwargs: Any) -> FileUploader | None:
"""Get a file uploader for a specific provider.
Args:
provider: Provider name (e.g., "gemini", "anthropic").
**kwargs: Additional arguments passed to the uploader constructor.
Returns:
FileUploader instance for the provider, or None if not supported.
"""
provider_lower = provider.lower()
if "gemini" in provider_lower or "google" in provider_lower:
try:
from crewai.utilities.files.uploaders.gemini import GeminiFileUploader
return GeminiFileUploader(**kwargs)
except ImportError:
logger.warning(
"google-genai not installed. Install with: pip install google-genai"
)
return None
if "anthropic" in provider_lower or "claude" in provider_lower:
try:
from crewai.utilities.files.uploaders.anthropic import AnthropicFileUploader
return AnthropicFileUploader(**kwargs)
except ImportError:
logger.warning(
"anthropic not installed. Install with: pip install anthropic"
)
return None
if "openai" in provider_lower or "gpt" in provider_lower:
try:
from crewai.utilities.files.uploaders.openai import OpenAIFileUploader
return OpenAIFileUploader(**kwargs)
except ImportError:
logger.warning("openai not installed. Install with: pip install openai")
return None
logger.debug(f"No file uploader available for provider: {provider}")
return None

View File

@@ -0,0 +1,168 @@
"""Anthropic Files API uploader implementation."""
from __future__ import annotations
import io
import logging
import os
from typing import Any
from crewai.utilities.files.content_types import (
AudioFile,
ImageFile,
PDFFile,
TextFile,
VideoFile,
)
from crewai.utilities.files.uploaders.base import FileUploader, UploadResult
logger = logging.getLogger(__name__)
FileInput = AudioFile | ImageFile | PDFFile | TextFile | VideoFile
class AnthropicFileUploader(FileUploader):
"""Uploader for Anthropic Files API.
Uses the anthropic SDK to upload files. Files are stored persistently
until explicitly deleted.
Attributes:
api_key: Optional API key (uses ANTHROPIC_API_KEY env var if not provided).
"""
def __init__(self, api_key: str | None = None) -> None:
"""Initialize the Anthropic uploader.
Args:
api_key: Optional Anthropic API key. If not provided, uses
ANTHROPIC_API_KEY environment variable.
"""
self._api_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
self._client: Any = None
@property
def provider_name(self) -> str:
"""Return the provider name."""
return "anthropic"
def _get_client(self) -> Any:
"""Get or create the Anthropic client."""
if self._client is None:
try:
import anthropic
self._client = anthropic.Anthropic(api_key=self._api_key)
except ImportError as e:
raise ImportError(
"anthropic is required for Anthropic file uploads. "
"Install with: pip install anthropic"
) from e
return self._client
def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult:
"""Upload a file to Anthropic.
Args:
file: The file to upload.
purpose: Optional purpose for the file (default: "user_upload").
Returns:
UploadResult with the file ID and metadata.
Raises:
Exception: If upload fails.
"""
client = self._get_client()
content = file.source.read()
file_purpose = purpose or "user_upload"
# Create a file-like object for upload
file_data = io.BytesIO(content)
logger.info(
f"Uploading file '{file.filename}' to Anthropic ({len(content)} bytes)"
)
# Upload using the anthropic client
# Note: The Anthropic Files API uses a tuple format: (filename, file_obj, content_type)
uploaded_file = client.files.create(
file=(file.filename, file_data, file.content_type),
purpose=file_purpose,
)
logger.info(f"Uploaded to Anthropic: {uploaded_file.id}")
return UploadResult(
file_id=uploaded_file.id,
file_uri=None, # Anthropic doesn't provide a URI
content_type=file.content_type,
expires_at=None, # Anthropic files don't auto-expire
provider=self.provider_name,
)
def delete(self, file_id: str) -> bool:
"""Delete an uploaded file from Anthropic.
Args:
file_id: The file ID to delete.
Returns:
True if deletion was successful, False otherwise.
"""
try:
client = self._get_client()
client.files.delete(file_id=file_id)
logger.info(f"Deleted Anthropic file: {file_id}")
return True
except Exception as e:
logger.warning(f"Failed to delete Anthropic file {file_id}: {e}")
return False
def get_file_info(self, file_id: str) -> dict[str, Any] | None:
"""Get information about an uploaded file.
Args:
file_id: The file ID.
Returns:
Dictionary with file information, or None if not found.
"""
try:
client = self._get_client()
file_info = client.files.retrieve(file_id=file_id)
return {
"id": file_info.id,
"filename": file_info.filename,
"purpose": file_info.purpose,
"size_bytes": file_info.size_bytes,
"created_at": file_info.created_at,
}
except Exception as e:
logger.debug(f"Failed to get Anthropic file info for {file_id}: {e}")
return None
def list_files(self) -> list[dict[str, Any]]:
"""List all uploaded files.
Returns:
List of dictionaries with file information.
"""
try:
client = self._get_client()
files = client.files.list()
return [
{
"id": f.id,
"filename": f.filename,
"purpose": f.purpose,
"size_bytes": f.size_bytes,
"created_at": f.created_at,
}
for f in files.data
]
except Exception as e:
logger.warning(f"Failed to list Anthropic files: {e}")
return []

View File

@@ -0,0 +1,93 @@
"""Base class for file uploaders."""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from crewai.utilities.files.content_types import (
AudioFile,
ImageFile,
PDFFile,
TextFile,
VideoFile,
)
FileInput = AudioFile | ImageFile | PDFFile | TextFile | VideoFile
@dataclass
class UploadResult:
"""Result of a file upload operation.
Attributes:
file_id: Provider-specific file identifier.
file_uri: Optional URI for accessing the file.
content_type: MIME type of the uploaded file.
expires_at: When the upload expires (if applicable).
provider: Name of the provider.
"""
file_id: str
provider: str
content_type: str
file_uri: str | None = None
expires_at: datetime | None = None
class FileUploader(ABC):
"""Abstract base class for provider file uploaders.
Implementations handle uploading files to provider-specific File APIs.
"""
@property
@abstractmethod
def provider_name(self) -> str:
"""Return the provider name."""
@abstractmethod
def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult:
"""Upload a file to the provider.
Args:
file: The file to upload.
purpose: Optional purpose/description for the upload.
Returns:
UploadResult with the file identifier and metadata.
Raises:
Exception: If upload fails.
"""
@abstractmethod
def delete(self, file_id: str) -> bool:
"""Delete an uploaded file.
Args:
file_id: The file identifier to delete.
Returns:
True if deletion was successful, False otherwise.
"""
def get_file_info(self, file_id: str) -> dict[str, Any] | None:
"""Get information about an uploaded file.
Args:
file_id: The file identifier.
Returns:
Dictionary with file information, or None if not found.
"""
return None
def list_files(self) -> list[dict[str, Any]]:
"""List all uploaded files.
Returns:
List of dictionaries with file information.
"""
return []

View File

@@ -0,0 +1,220 @@
"""Gemini File API uploader implementation."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import io
import logging
import os
from typing import Any
from crewai.utilities.files.content_types import (
AudioFile,
ImageFile,
PDFFile,
TextFile,
VideoFile,
)
from crewai.utilities.files.uploaders.base import FileUploader, UploadResult
logger = logging.getLogger(__name__)
FileInput = AudioFile | ImageFile | PDFFile | TextFile | VideoFile
# Gemini files expire after 48 hours
GEMINI_FILE_TTL = timedelta(hours=48)
class GeminiFileUploader(FileUploader):
"""Uploader for Google Gemini File API.
Uses the google-genai SDK to upload files. Files are stored for 48 hours.
Attributes:
api_key: Optional API key (uses GOOGLE_API_KEY env var if not provided).
"""
def __init__(self, api_key: str | None = None) -> None:
"""Initialize the Gemini uploader.
Args:
api_key: Optional Google API key. If not provided, uses
GOOGLE_API_KEY environment variable.
"""
self._api_key = api_key or os.environ.get("GOOGLE_API_KEY")
self._client: Any = None
@property
def provider_name(self) -> str:
"""Return the provider name."""
return "gemini"
def _get_client(self) -> Any:
"""Get or create the Gemini client."""
if self._client is None:
try:
from google import genai
self._client = genai.Client(api_key=self._api_key)
except ImportError as e:
raise ImportError(
"google-genai is required for Gemini file uploads. "
"Install with: pip install google-genai"
) from e
return self._client
def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult:
"""Upload a file to Gemini.
Args:
file: The file to upload.
purpose: Optional purpose/description (used as display name).
Returns:
UploadResult with the file URI and metadata.
Raises:
Exception: If upload fails.
"""
client = self._get_client()
content = file.source.read()
display_name = purpose or file.filename
# Create a file-like object for upload
file_data = io.BytesIO(content)
file_data.name = file.filename
logger.info(
f"Uploading file '{file.filename}' to Gemini ({len(content)} bytes)"
)
# Upload using the genai client
uploaded_file = client.files.upload(
file=file_data,
config={
"display_name": display_name,
"mime_type": file.content_type,
},
)
expires_at = datetime.now(timezone.utc) + GEMINI_FILE_TTL
logger.info(
f"Uploaded to Gemini: {uploaded_file.name} (URI: {uploaded_file.uri})"
)
return UploadResult(
file_id=uploaded_file.name,
file_uri=uploaded_file.uri,
content_type=file.content_type,
expires_at=expires_at,
provider=self.provider_name,
)
def delete(self, file_id: str) -> bool:
"""Delete an uploaded file from Gemini.
Args:
file_id: The file name/ID to delete.
Returns:
True if deletion was successful, False otherwise.
"""
try:
client = self._get_client()
client.files.delete(name=file_id)
logger.info(f"Deleted Gemini file: {file_id}")
return True
except Exception as e:
logger.warning(f"Failed to delete Gemini file {file_id}: {e}")
return False
def get_file_info(self, file_id: str) -> dict[str, Any] | None:
"""Get information about an uploaded file.
Args:
file_id: The file name/ID.
Returns:
Dictionary with file information, or None if not found.
"""
try:
client = self._get_client()
file_info = client.files.get(name=file_id)
return {
"name": file_info.name,
"uri": file_info.uri,
"display_name": file_info.display_name,
"mime_type": file_info.mime_type,
"size_bytes": file_info.size_bytes,
"state": str(file_info.state),
"create_time": file_info.create_time,
"expiration_time": file_info.expiration_time,
}
except Exception as e:
logger.debug(f"Failed to get Gemini file info for {file_id}: {e}")
return None
def list_files(self) -> list[dict[str, Any]]:
"""List all uploaded files.
Returns:
List of dictionaries with file information.
"""
try:
client = self._get_client()
files = client.files.list()
return [
{
"name": f.name,
"uri": f.uri,
"display_name": f.display_name,
"mime_type": f.mime_type,
"size_bytes": f.size_bytes,
"state": str(f.state),
}
for f in files
]
except Exception as e:
logger.warning(f"Failed to list Gemini files: {e}")
return []
def wait_for_processing(self, file_id: str, timeout_seconds: int = 300) -> bool:
"""Wait for a file to finish processing.
Some files (especially videos) need time to process after upload.
Args:
file_id: The file name/ID.
timeout_seconds: Maximum time to wait.
Returns:
True if processing completed, False if timed out or failed.
"""
import time
try:
from google.genai.types import FileState
except ImportError:
# If we can't import FileState, just return True
return True
client = self._get_client()
start_time = time.time()
while time.time() - start_time < timeout_seconds:
file_info = client.files.get(name=file_id)
if file_info.state == FileState.ACTIVE:
return True
if file_info.state == FileState.FAILED:
logger.error(f"Gemini file processing failed: {file_id}")
return False
time.sleep(2)
logger.warning(f"Timed out waiting for Gemini file processing: {file_id}")
return False

View File

@@ -0,0 +1,168 @@
"""OpenAI Files API uploader implementation."""
from __future__ import annotations
import io
import logging
import os
from typing import Any
from crewai.utilities.files.content_types import (
AudioFile,
ImageFile,
PDFFile,
TextFile,
VideoFile,
)
from crewai.utilities.files.uploaders.base import FileUploader, UploadResult
logger = logging.getLogger(__name__)
FileInput = AudioFile | ImageFile | PDFFile | TextFile | VideoFile
class OpenAIFileUploader(FileUploader):
"""Uploader for OpenAI Files API.
Uses the OpenAI SDK to upload files. Files are stored persistently
until explicitly deleted.
Attributes:
api_key: Optional API key (uses OPENAI_API_KEY env var if not provided).
"""
def __init__(self, api_key: str | None = None) -> None:
"""Initialize the OpenAI uploader.
Args:
api_key: Optional OpenAI API key. If not provided, uses
OPENAI_API_KEY environment variable.
"""
self._api_key = api_key or os.environ.get("OPENAI_API_KEY")
self._client: Any = None
@property
def provider_name(self) -> str:
"""Return the provider name."""
return "openai"
def _get_client(self) -> Any:
"""Get or create the OpenAI client."""
if self._client is None:
try:
from openai import OpenAI
self._client = OpenAI(api_key=self._api_key)
except ImportError as e:
raise ImportError(
"openai is required for OpenAI file uploads. "
"Install with: pip install openai"
) from e
return self._client
def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult:
"""Upload a file to OpenAI.
Args:
file: The file to upload.
purpose: Optional purpose for the file (default: "user_data").
Returns:
UploadResult with the file ID and metadata.
Raises:
Exception: If upload fails.
"""
client = self._get_client()
content = file.source.read()
file_purpose = purpose or "user_data"
file_data = io.BytesIO(content)
file_data.name = file.filename or "file"
logger.info(
f"Uploading file '{file.filename}' to OpenAI ({len(content)} bytes)"
)
uploaded_file = client.files.create(
file=file_data,
purpose=file_purpose,
)
logger.info(f"Uploaded to OpenAI: {uploaded_file.id}")
return UploadResult(
file_id=uploaded_file.id,
file_uri=None, # OpenAI doesn't provide a URI
content_type=file.content_type,
expires_at=None, # OpenAI files don't auto-expire
provider=self.provider_name,
)
def delete(self, file_id: str) -> bool:
"""Delete an uploaded file from OpenAI.
Args:
file_id: The file ID to delete.
Returns:
True if deletion was successful, False otherwise.
"""
try:
client = self._get_client()
client.files.delete(file_id)
logger.info(f"Deleted OpenAI file: {file_id}")
return True
except Exception as e:
logger.warning(f"Failed to delete OpenAI file {file_id}: {e}")
return False
def get_file_info(self, file_id: str) -> dict[str, Any] | None:
"""Get information about an uploaded file.
Args:
file_id: The file ID.
Returns:
Dictionary with file information, or None if not found.
"""
try:
client = self._get_client()
file_info = client.files.retrieve(file_id)
return {
"id": file_info.id,
"filename": file_info.filename,
"purpose": file_info.purpose,
"bytes": file_info.bytes,
"created_at": file_info.created_at,
"status": file_info.status,
}
except Exception as e:
logger.debug(f"Failed to get OpenAI file info for {file_id}: {e}")
return None
def list_files(self) -> list[dict[str, Any]]:
"""List all uploaded files.
Returns:
List of dictionaries with file information.
"""
try:
client = self._get_client()
files = client.files.list()
return [
{
"id": f.id,
"filename": f.filename,
"purpose": f.purpose,
"bytes": f.bytes,
"created_at": f.created_at,
"status": f.status,
}
for f in files.data
]
except Exception as e:
logger.warning(f"Failed to list OpenAI files: {e}")
return []