From 56946d309be666c17e8bf7916c8cf43adce88498 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 21 Jan 2026 18:38:04 -0500 Subject: [PATCH] feat: add provider file uploaders --- .../utilities/files/uploaders/__init__.py | 64 +++++ .../utilities/files/uploaders/anthropic.py | 168 +++++++++++++ .../crewai/utilities/files/uploaders/base.py | 93 ++++++++ .../utilities/files/uploaders/gemini.py | 220 ++++++++++++++++++ .../utilities/files/uploaders/openai.py | 168 +++++++++++++ 5 files changed, 713 insertions(+) create mode 100644 lib/crewai/src/crewai/utilities/files/uploaders/__init__.py create mode 100644 lib/crewai/src/crewai/utilities/files/uploaders/anthropic.py create mode 100644 lib/crewai/src/crewai/utilities/files/uploaders/base.py create mode 100644 lib/crewai/src/crewai/utilities/files/uploaders/gemini.py create mode 100644 lib/crewai/src/crewai/utilities/files/uploaders/openai.py diff --git a/lib/crewai/src/crewai/utilities/files/uploaders/__init__.py b/lib/crewai/src/crewai/utilities/files/uploaders/__init__.py new file mode 100644 index 000000000..105500ac9 --- /dev/null +++ b/lib/crewai/src/crewai/utilities/files/uploaders/__init__.py @@ -0,0 +1,64 @@ +"""File uploader implementations for provider File APIs.""" + +from __future__ import annotations + +import logging +from typing import Any + +from crewai.utilities.files.uploaders.base import FileUploader, UploadResult + + +logger = logging.getLogger(__name__) + +__all__ = [ + "FileUploader", + "UploadResult", + "get_uploader", +] + + +def get_uploader(provider: str, **kwargs: Any) -> FileUploader | None: + """Get a file uploader for a specific provider. + + Args: + provider: Provider name (e.g., "gemini", "anthropic"). + **kwargs: Additional arguments passed to the uploader constructor. + + Returns: + FileUploader instance for the provider, or None if not supported. + """ + provider_lower = provider.lower() + + if "gemini" in provider_lower or "google" in provider_lower: + try: + from crewai.utilities.files.uploaders.gemini import GeminiFileUploader + + return GeminiFileUploader(**kwargs) + except ImportError: + logger.warning( + "google-genai not installed. Install with: pip install google-genai" + ) + return None + + if "anthropic" in provider_lower or "claude" in provider_lower: + try: + from crewai.utilities.files.uploaders.anthropic import AnthropicFileUploader + + return AnthropicFileUploader(**kwargs) + except ImportError: + logger.warning( + "anthropic not installed. Install with: pip install anthropic" + ) + return None + + if "openai" in provider_lower or "gpt" in provider_lower: + try: + from crewai.utilities.files.uploaders.openai import OpenAIFileUploader + + return OpenAIFileUploader(**kwargs) + except ImportError: + logger.warning("openai not installed. Install with: pip install openai") + return None + + logger.debug(f"No file uploader available for provider: {provider}") + return None diff --git a/lib/crewai/src/crewai/utilities/files/uploaders/anthropic.py b/lib/crewai/src/crewai/utilities/files/uploaders/anthropic.py new file mode 100644 index 000000000..2e894757a --- /dev/null +++ b/lib/crewai/src/crewai/utilities/files/uploaders/anthropic.py @@ -0,0 +1,168 @@ +"""Anthropic Files API uploader implementation.""" + +from __future__ import annotations + +import io +import logging +import os +from typing import Any + +from crewai.utilities.files.content_types import ( + AudioFile, + ImageFile, + PDFFile, + TextFile, + VideoFile, +) +from crewai.utilities.files.uploaders.base import FileUploader, UploadResult + + +logger = logging.getLogger(__name__) + +FileInput = AudioFile | ImageFile | PDFFile | TextFile | VideoFile + + +class AnthropicFileUploader(FileUploader): + """Uploader for Anthropic Files API. + + Uses the anthropic SDK to upload files. Files are stored persistently + until explicitly deleted. + + Attributes: + api_key: Optional API key (uses ANTHROPIC_API_KEY env var if not provided). + """ + + def __init__(self, api_key: str | None = None) -> None: + """Initialize the Anthropic uploader. + + Args: + api_key: Optional Anthropic API key. If not provided, uses + ANTHROPIC_API_KEY environment variable. + """ + self._api_key = api_key or os.environ.get("ANTHROPIC_API_KEY") + self._client: Any = None + + @property + def provider_name(self) -> str: + """Return the provider name.""" + return "anthropic" + + def _get_client(self) -> Any: + """Get or create the Anthropic client.""" + if self._client is None: + try: + import anthropic + + self._client = anthropic.Anthropic(api_key=self._api_key) + except ImportError as e: + raise ImportError( + "anthropic is required for Anthropic file uploads. " + "Install with: pip install anthropic" + ) from e + return self._client + + def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult: + """Upload a file to Anthropic. + + Args: + file: The file to upload. + purpose: Optional purpose for the file (default: "user_upload"). + + Returns: + UploadResult with the file ID and metadata. + + Raises: + Exception: If upload fails. + """ + client = self._get_client() + + content = file.source.read() + file_purpose = purpose or "user_upload" + + # Create a file-like object for upload + file_data = io.BytesIO(content) + + logger.info( + f"Uploading file '{file.filename}' to Anthropic ({len(content)} bytes)" + ) + + # Upload using the anthropic client + # Note: The Anthropic Files API uses a tuple format: (filename, file_obj, content_type) + uploaded_file = client.files.create( + file=(file.filename, file_data, file.content_type), + purpose=file_purpose, + ) + + logger.info(f"Uploaded to Anthropic: {uploaded_file.id}") + + return UploadResult( + file_id=uploaded_file.id, + file_uri=None, # Anthropic doesn't provide a URI + content_type=file.content_type, + expires_at=None, # Anthropic files don't auto-expire + provider=self.provider_name, + ) + + def delete(self, file_id: str) -> bool: + """Delete an uploaded file from Anthropic. + + Args: + file_id: The file ID to delete. + + Returns: + True if deletion was successful, False otherwise. + """ + try: + client = self._get_client() + client.files.delete(file_id=file_id) + logger.info(f"Deleted Anthropic file: {file_id}") + return True + except Exception as e: + logger.warning(f"Failed to delete Anthropic file {file_id}: {e}") + return False + + def get_file_info(self, file_id: str) -> dict[str, Any] | None: + """Get information about an uploaded file. + + Args: + file_id: The file ID. + + Returns: + Dictionary with file information, or None if not found. + """ + try: + client = self._get_client() + file_info = client.files.retrieve(file_id=file_id) + return { + "id": file_info.id, + "filename": file_info.filename, + "purpose": file_info.purpose, + "size_bytes": file_info.size_bytes, + "created_at": file_info.created_at, + } + except Exception as e: + logger.debug(f"Failed to get Anthropic file info for {file_id}: {e}") + return None + + def list_files(self) -> list[dict[str, Any]]: + """List all uploaded files. + + Returns: + List of dictionaries with file information. + """ + try: + client = self._get_client() + files = client.files.list() + return [ + { + "id": f.id, + "filename": f.filename, + "purpose": f.purpose, + "size_bytes": f.size_bytes, + "created_at": f.created_at, + } + for f in files.data + ] + except Exception as e: + logger.warning(f"Failed to list Anthropic files: {e}") + return [] diff --git a/lib/crewai/src/crewai/utilities/files/uploaders/base.py b/lib/crewai/src/crewai/utilities/files/uploaders/base.py new file mode 100644 index 000000000..35b4df271 --- /dev/null +++ b/lib/crewai/src/crewai/utilities/files/uploaders/base.py @@ -0,0 +1,93 @@ +"""Base class for file uploaders.""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from crewai.utilities.files.content_types import ( + AudioFile, + ImageFile, + PDFFile, + TextFile, + VideoFile, +) + + +FileInput = AudioFile | ImageFile | PDFFile | TextFile | VideoFile + + +@dataclass +class UploadResult: + """Result of a file upload operation. + + Attributes: + file_id: Provider-specific file identifier. + file_uri: Optional URI for accessing the file. + content_type: MIME type of the uploaded file. + expires_at: When the upload expires (if applicable). + provider: Name of the provider. + """ + + file_id: str + provider: str + content_type: str + file_uri: str | None = None + expires_at: datetime | None = None + + +class FileUploader(ABC): + """Abstract base class for provider file uploaders. + + Implementations handle uploading files to provider-specific File APIs. + """ + + @property + @abstractmethod + def provider_name(self) -> str: + """Return the provider name.""" + + @abstractmethod + def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult: + """Upload a file to the provider. + + Args: + file: The file to upload. + purpose: Optional purpose/description for the upload. + + Returns: + UploadResult with the file identifier and metadata. + + Raises: + Exception: If upload fails. + """ + + @abstractmethod + def delete(self, file_id: str) -> bool: + """Delete an uploaded file. + + Args: + file_id: The file identifier to delete. + + Returns: + True if deletion was successful, False otherwise. + """ + + def get_file_info(self, file_id: str) -> dict[str, Any] | None: + """Get information about an uploaded file. + + Args: + file_id: The file identifier. + + Returns: + Dictionary with file information, or None if not found. + """ + return None + + def list_files(self) -> list[dict[str, Any]]: + """List all uploaded files. + + Returns: + List of dictionaries with file information. + """ + return [] diff --git a/lib/crewai/src/crewai/utilities/files/uploaders/gemini.py b/lib/crewai/src/crewai/utilities/files/uploaders/gemini.py new file mode 100644 index 000000000..87d14124a --- /dev/null +++ b/lib/crewai/src/crewai/utilities/files/uploaders/gemini.py @@ -0,0 +1,220 @@ +"""Gemini File API uploader implementation.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +import io +import logging +import os +from typing import Any + +from crewai.utilities.files.content_types import ( + AudioFile, + ImageFile, + PDFFile, + TextFile, + VideoFile, +) +from crewai.utilities.files.uploaders.base import FileUploader, UploadResult + + +logger = logging.getLogger(__name__) + +FileInput = AudioFile | ImageFile | PDFFile | TextFile | VideoFile + +# Gemini files expire after 48 hours +GEMINI_FILE_TTL = timedelta(hours=48) + + +class GeminiFileUploader(FileUploader): + """Uploader for Google Gemini File API. + + Uses the google-genai SDK to upload files. Files are stored for 48 hours. + + Attributes: + api_key: Optional API key (uses GOOGLE_API_KEY env var if not provided). + """ + + def __init__(self, api_key: str | None = None) -> None: + """Initialize the Gemini uploader. + + Args: + api_key: Optional Google API key. If not provided, uses + GOOGLE_API_KEY environment variable. + """ + self._api_key = api_key or os.environ.get("GOOGLE_API_KEY") + self._client: Any = None + + @property + def provider_name(self) -> str: + """Return the provider name.""" + return "gemini" + + def _get_client(self) -> Any: + """Get or create the Gemini client.""" + if self._client is None: + try: + from google import genai + + self._client = genai.Client(api_key=self._api_key) + except ImportError as e: + raise ImportError( + "google-genai is required for Gemini file uploads. " + "Install with: pip install google-genai" + ) from e + return self._client + + def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult: + """Upload a file to Gemini. + + Args: + file: The file to upload. + purpose: Optional purpose/description (used as display name). + + Returns: + UploadResult with the file URI and metadata. + + Raises: + Exception: If upload fails. + """ + client = self._get_client() + + content = file.source.read() + display_name = purpose or file.filename + + # Create a file-like object for upload + file_data = io.BytesIO(content) + file_data.name = file.filename + + logger.info( + f"Uploading file '{file.filename}' to Gemini ({len(content)} bytes)" + ) + + # Upload using the genai client + uploaded_file = client.files.upload( + file=file_data, + config={ + "display_name": display_name, + "mime_type": file.content_type, + }, + ) + + expires_at = datetime.now(timezone.utc) + GEMINI_FILE_TTL + + logger.info( + f"Uploaded to Gemini: {uploaded_file.name} (URI: {uploaded_file.uri})" + ) + + return UploadResult( + file_id=uploaded_file.name, + file_uri=uploaded_file.uri, + content_type=file.content_type, + expires_at=expires_at, + provider=self.provider_name, + ) + + def delete(self, file_id: str) -> bool: + """Delete an uploaded file from Gemini. + + Args: + file_id: The file name/ID to delete. + + Returns: + True if deletion was successful, False otherwise. + """ + try: + client = self._get_client() + client.files.delete(name=file_id) + logger.info(f"Deleted Gemini file: {file_id}") + return True + except Exception as e: + logger.warning(f"Failed to delete Gemini file {file_id}: {e}") + return False + + def get_file_info(self, file_id: str) -> dict[str, Any] | None: + """Get information about an uploaded file. + + Args: + file_id: The file name/ID. + + Returns: + Dictionary with file information, or None if not found. + """ + try: + client = self._get_client() + file_info = client.files.get(name=file_id) + return { + "name": file_info.name, + "uri": file_info.uri, + "display_name": file_info.display_name, + "mime_type": file_info.mime_type, + "size_bytes": file_info.size_bytes, + "state": str(file_info.state), + "create_time": file_info.create_time, + "expiration_time": file_info.expiration_time, + } + except Exception as e: + logger.debug(f"Failed to get Gemini file info for {file_id}: {e}") + return None + + def list_files(self) -> list[dict[str, Any]]: + """List all uploaded files. + + Returns: + List of dictionaries with file information. + """ + try: + client = self._get_client() + files = client.files.list() + return [ + { + "name": f.name, + "uri": f.uri, + "display_name": f.display_name, + "mime_type": f.mime_type, + "size_bytes": f.size_bytes, + "state": str(f.state), + } + for f in files + ] + except Exception as e: + logger.warning(f"Failed to list Gemini files: {e}") + return [] + + def wait_for_processing(self, file_id: str, timeout_seconds: int = 300) -> bool: + """Wait for a file to finish processing. + + Some files (especially videos) need time to process after upload. + + Args: + file_id: The file name/ID. + timeout_seconds: Maximum time to wait. + + Returns: + True if processing completed, False if timed out or failed. + """ + import time + + try: + from google.genai.types import FileState + except ImportError: + # If we can't import FileState, just return True + return True + + client = self._get_client() + start_time = time.time() + + while time.time() - start_time < timeout_seconds: + file_info = client.files.get(name=file_id) + + if file_info.state == FileState.ACTIVE: + return True + + if file_info.state == FileState.FAILED: + logger.error(f"Gemini file processing failed: {file_id}") + return False + + time.sleep(2) + + logger.warning(f"Timed out waiting for Gemini file processing: {file_id}") + return False diff --git a/lib/crewai/src/crewai/utilities/files/uploaders/openai.py b/lib/crewai/src/crewai/utilities/files/uploaders/openai.py new file mode 100644 index 000000000..73e62b631 --- /dev/null +++ b/lib/crewai/src/crewai/utilities/files/uploaders/openai.py @@ -0,0 +1,168 @@ +"""OpenAI Files API uploader implementation.""" + +from __future__ import annotations + +import io +import logging +import os +from typing import Any + +from crewai.utilities.files.content_types import ( + AudioFile, + ImageFile, + PDFFile, + TextFile, + VideoFile, +) +from crewai.utilities.files.uploaders.base import FileUploader, UploadResult + + +logger = logging.getLogger(__name__) + +FileInput = AudioFile | ImageFile | PDFFile | TextFile | VideoFile + + +class OpenAIFileUploader(FileUploader): + """Uploader for OpenAI Files API. + + Uses the OpenAI SDK to upload files. Files are stored persistently + until explicitly deleted. + + Attributes: + api_key: Optional API key (uses OPENAI_API_KEY env var if not provided). + """ + + def __init__(self, api_key: str | None = None) -> None: + """Initialize the OpenAI uploader. + + Args: + api_key: Optional OpenAI API key. If not provided, uses + OPENAI_API_KEY environment variable. + """ + self._api_key = api_key or os.environ.get("OPENAI_API_KEY") + self._client: Any = None + + @property + def provider_name(self) -> str: + """Return the provider name.""" + return "openai" + + def _get_client(self) -> Any: + """Get or create the OpenAI client.""" + if self._client is None: + try: + from openai import OpenAI + + self._client = OpenAI(api_key=self._api_key) + except ImportError as e: + raise ImportError( + "openai is required for OpenAI file uploads. " + "Install with: pip install openai" + ) from e + return self._client + + def upload(self, file: FileInput, purpose: str | None = None) -> UploadResult: + """Upload a file to OpenAI. + + Args: + file: The file to upload. + purpose: Optional purpose for the file (default: "user_data"). + + Returns: + UploadResult with the file ID and metadata. + + Raises: + Exception: If upload fails. + """ + client = self._get_client() + + content = file.source.read() + file_purpose = purpose or "user_data" + + file_data = io.BytesIO(content) + file_data.name = file.filename or "file" + + logger.info( + f"Uploading file '{file.filename}' to OpenAI ({len(content)} bytes)" + ) + + uploaded_file = client.files.create( + file=file_data, + purpose=file_purpose, + ) + + logger.info(f"Uploaded to OpenAI: {uploaded_file.id}") + + return UploadResult( + file_id=uploaded_file.id, + file_uri=None, # OpenAI doesn't provide a URI + content_type=file.content_type, + expires_at=None, # OpenAI files don't auto-expire + provider=self.provider_name, + ) + + def delete(self, file_id: str) -> bool: + """Delete an uploaded file from OpenAI. + + Args: + file_id: The file ID to delete. + + Returns: + True if deletion was successful, False otherwise. + """ + try: + client = self._get_client() + client.files.delete(file_id) + logger.info(f"Deleted OpenAI file: {file_id}") + return True + except Exception as e: + logger.warning(f"Failed to delete OpenAI file {file_id}: {e}") + return False + + def get_file_info(self, file_id: str) -> dict[str, Any] | None: + """Get information about an uploaded file. + + Args: + file_id: The file ID. + + Returns: + Dictionary with file information, or None if not found. + """ + try: + client = self._get_client() + file_info = client.files.retrieve(file_id) + return { + "id": file_info.id, + "filename": file_info.filename, + "purpose": file_info.purpose, + "bytes": file_info.bytes, + "created_at": file_info.created_at, + "status": file_info.status, + } + except Exception as e: + logger.debug(f"Failed to get OpenAI file info for {file_id}: {e}") + return None + + def list_files(self) -> list[dict[str, Any]]: + """List all uploaded files. + + Returns: + List of dictionaries with file information. + """ + try: + client = self._get_client() + files = client.files.list() + return [ + { + "id": f.id, + "filename": f.filename, + "purpose": f.purpose, + "bytes": f.bytes, + "created_at": f.created_at, + "status": f.status, + } + for f in files.data + ] + except Exception as e: + logger.warning(f"Failed to list OpenAI files: {e}") + return []