mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-22 22:58:13 +00:00
refactor: reorganize files module with centralized constants and utilities
This commit is contained in:
@@ -73,69 +73,7 @@ from crewai.files.upload_cache import (
|
||||
reset_upload_cache,
|
||||
)
|
||||
from crewai.files.uploaders import FileUploader, UploadResult, get_uploader
|
||||
|
||||
|
||||
def wrap_file_source(source: FileSource) -> FileInput:
|
||||
"""Wrap a FileSource in the appropriate typed FileInput wrapper.
|
||||
|
||||
Args:
|
||||
source: The file source to wrap.
|
||||
|
||||
Returns:
|
||||
Typed FileInput wrapper based on content type.
|
||||
"""
|
||||
content_type = source.content_type
|
||||
|
||||
if content_type.startswith("image/"):
|
||||
return ImageFile(source=source)
|
||||
if content_type.startswith("audio/"):
|
||||
return AudioFile(source=source)
|
||||
if content_type.startswith("video/"):
|
||||
return VideoFile(source=source)
|
||||
if content_type == "application/pdf":
|
||||
return PDFFile(source=source)
|
||||
return TextFile(source=source)
|
||||
|
||||
|
||||
def normalize_input_files(
|
||||
input_files: list[FileSourceInput | FileInput],
|
||||
) -> dict[str, FileInput]:
|
||||
"""Convert a list of file sources to a named dictionary of FileInputs.
|
||||
|
||||
Args:
|
||||
input_files: List of file source inputs or File objects.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping names to FileInput wrappers.
|
||||
"""
|
||||
from pathlib import Path
|
||||
|
||||
result: dict[str, FileInput] = {}
|
||||
|
||||
for i, item in enumerate(input_files):
|
||||
if isinstance(item, BaseFile):
|
||||
name = item.filename or f"file_{i}"
|
||||
if "." in name:
|
||||
name = name.rsplit(".", 1)[0]
|
||||
result[name] = item
|
||||
continue
|
||||
|
||||
file_source: FilePath | FileBytes | FileStream
|
||||
if isinstance(item, (FilePath, FileBytes, FileStream)):
|
||||
file_source = item
|
||||
elif isinstance(item, Path):
|
||||
file_source = FilePath(path=item)
|
||||
elif isinstance(item, str):
|
||||
file_source = FilePath(path=Path(item))
|
||||
elif isinstance(item, (bytes, memoryview)):
|
||||
file_source = FileBytes(data=bytes(item))
|
||||
else:
|
||||
continue
|
||||
|
||||
name = file_source.filename or f"file_{i}"
|
||||
result[name] = wrap_file_source(file_source)
|
||||
|
||||
return result
|
||||
from crewai.files.utils import normalize_input_files, wrap_file_source
|
||||
|
||||
|
||||
__all__ = [
|
||||
|
||||
26
lib/crewai/src/crewai/files/constants.py
Normal file
26
lib/crewai/src/crewai/files/constants.py
Normal file
@@ -0,0 +1,26 @@
|
||||
"""Constants for file handling utilities."""
|
||||
|
||||
from datetime import timedelta
|
||||
from typing import Final, Literal
|
||||
|
||||
|
||||
DEFAULT_MAX_FILE_SIZE_BYTES: Final[Literal[524_288_000]] = 524_288_000
|
||||
MAGIC_BUFFER_SIZE: Final[Literal[2048]] = 2048
|
||||
|
||||
UPLOAD_MAX_RETRIES: Final[Literal[3]] = 3
|
||||
UPLOAD_RETRY_DELAY_BASE: Final[Literal[2]] = 2
|
||||
|
||||
DEFAULT_TTL_SECONDS: Final[Literal[86_400]] = 86_400
|
||||
DEFAULT_MAX_CACHE_ENTRIES: Final[Literal[1000]] = 1000
|
||||
|
||||
GEMINI_FILE_TTL: Final[timedelta] = timedelta(hours=48)
|
||||
BACKOFF_BASE_DELAY: Final[float] = 1.0
|
||||
BACKOFF_MAX_DELAY: Final[float] = 30.0
|
||||
BACKOFF_JITTER_FACTOR: Final[float] = 0.1
|
||||
|
||||
FILES_API_MAX_SIZE: Final[Literal[536_870_912]] = 536_870_912
|
||||
DEFAULT_UPLOAD_CHUNK_SIZE: Final[Literal[67_108_864]] = 67_108_864
|
||||
|
||||
MULTIPART_THRESHOLD: Final[Literal[8_388_608]] = 8_388_608
|
||||
MULTIPART_CHUNKSIZE: Final[Literal[8_388_608]] = 8_388_608
|
||||
MAX_CONCURRENCY: Final[Literal[10]] = 10
|
||||
@@ -9,7 +9,6 @@ from typing import Annotated, Any, BinaryIO, Literal, Self
|
||||
|
||||
from pydantic import BaseModel, Field, GetCoreSchemaHandler
|
||||
from pydantic_core import CoreSchema, core_schema
|
||||
from typing_extensions import TypeIs
|
||||
|
||||
from crewai.files.file import (
|
||||
AsyncFileStream,
|
||||
@@ -18,6 +17,7 @@ from crewai.files.file import (
|
||||
FileSource,
|
||||
FileStream,
|
||||
)
|
||||
from crewai.files.utils import is_file_source
|
||||
|
||||
|
||||
FileSourceInput = str | Path | bytes | IOBase | FileSource
|
||||
@@ -60,12 +60,6 @@ class _FileSourceCoercer:
|
||||
|
||||
CoercedFileSource = Annotated[FileSourceInput, _FileSourceCoercer]
|
||||
|
||||
|
||||
def _is_file_source(v: FileSourceInput) -> TypeIs[FileSource]:
|
||||
"""Type guard to narrow FileSourceInput to FileSource."""
|
||||
return isinstance(v, (FilePath, FileBytes, FileStream))
|
||||
|
||||
|
||||
FileMode = Literal["strict", "auto", "warn", "chunk"]
|
||||
|
||||
|
||||
@@ -184,7 +178,7 @@ class BaseFile(ABC, BaseModel):
|
||||
@property
|
||||
def _file_source(self) -> FileSource:
|
||||
"""Get source with narrowed type (always FileSource after validation)."""
|
||||
if _is_file_source(self.source):
|
||||
if is_file_source(self.source):
|
||||
return self.source
|
||||
raise TypeError("source must be a FileSource after validation")
|
||||
|
||||
|
||||
@@ -18,6 +18,8 @@ from pydantic import (
|
||||
)
|
||||
from pydantic_core import CoreSchema, core_schema
|
||||
|
||||
from crewai.files.constants import DEFAULT_MAX_FILE_SIZE_BYTES, MAGIC_BUFFER_SIZE
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class AsyncReadable(Protocol):
|
||||
@@ -51,7 +53,14 @@ class _AsyncReadableValidator:
|
||||
|
||||
ValidatedAsyncReadable = Annotated[AsyncReadable, _AsyncReadableValidator()]
|
||||
|
||||
DEFAULT_MAX_FILE_SIZE_BYTES = 500 * 1024 * 1024 # 500MB
|
||||
|
||||
def _fallback_content_type(filename: str | None) -> str:
|
||||
"""Get content type from filename extension or return default."""
|
||||
if filename:
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
if mime_type:
|
||||
return mime_type
|
||||
return "application/octet-stream"
|
||||
|
||||
|
||||
def detect_content_type(data: bytes, filename: str | None = None) -> str:
|
||||
@@ -61,7 +70,7 @@ def detect_content_type(data: bytes, filename: str | None = None) -> str:
|
||||
falls back to mimetypes module using filename extension.
|
||||
|
||||
Args:
|
||||
data: Raw bytes to analyze.
|
||||
data: Raw bytes to analyze (only first 2048 bytes are used).
|
||||
filename: Optional filename for extension-based fallback.
|
||||
|
||||
Returns:
|
||||
@@ -70,14 +79,32 @@ def detect_content_type(data: bytes, filename: str | None = None) -> str:
|
||||
try:
|
||||
import magic
|
||||
|
||||
result: str = magic.from_buffer(data, mime=True)
|
||||
result: str = magic.from_buffer(data[:MAGIC_BUFFER_SIZE], mime=True)
|
||||
return result
|
||||
except ImportError:
|
||||
if filename:
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
if mime_type:
|
||||
return mime_type
|
||||
return "application/octet-stream"
|
||||
return _fallback_content_type(filename)
|
||||
|
||||
|
||||
def detect_content_type_from_path(path: Path, filename: str | None = None) -> str:
|
||||
"""Detect MIME type from file path.
|
||||
|
||||
Uses python-magic's from_file() for accurate detection without reading
|
||||
the entire file into memory.
|
||||
|
||||
Args:
|
||||
path: Path to the file.
|
||||
filename: Optional filename for extension-based fallback.
|
||||
|
||||
Returns:
|
||||
The detected MIME type.
|
||||
"""
|
||||
try:
|
||||
import magic
|
||||
|
||||
result: str = magic.from_file(str(path), mime=True)
|
||||
return result
|
||||
except ImportError:
|
||||
return _fallback_content_type(filename or path.name)
|
||||
|
||||
|
||||
class _BinaryIOValidator:
|
||||
@@ -114,6 +141,7 @@ class FilePath(BaseModel):
|
||||
description="Maximum file size in bytes.",
|
||||
)
|
||||
_content: bytes | None = PrivateAttr(default=None)
|
||||
_content_type: str = PrivateAttr()
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_file_exists(self) -> FilePath:
|
||||
@@ -144,6 +172,7 @@ class FilePath(BaseModel):
|
||||
max_size=self.max_size_bytes,
|
||||
)
|
||||
|
||||
self._content_type = detect_content_type_from_path(self.path, self.path.name)
|
||||
return self
|
||||
|
||||
@property
|
||||
@@ -153,8 +182,8 @@ class FilePath(BaseModel):
|
||||
|
||||
@property
|
||||
def content_type(self) -> str:
|
||||
"""Get the content type by reading file content."""
|
||||
return detect_content_type(self.read(), self.filename)
|
||||
"""Get the content type."""
|
||||
return self._content_type
|
||||
|
||||
def read(self) -> bytes:
|
||||
"""Read the file content from disk."""
|
||||
@@ -201,11 +230,18 @@ class FileBytes(BaseModel):
|
||||
|
||||
data: bytes = Field(description="Raw bytes content of the file.")
|
||||
filename: str | None = Field(default=None, description="Optional filename.")
|
||||
_content_type: str = PrivateAttr()
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _detect_content_type(self) -> FileBytes:
|
||||
"""Detect and cache content type from data."""
|
||||
self._content_type = detect_content_type(self.data, self.filename)
|
||||
return self
|
||||
|
||||
@property
|
||||
def content_type(self) -> str:
|
||||
"""Get the content type from the data."""
|
||||
return detect_content_type(self.data, self.filename)
|
||||
"""Get the content type."""
|
||||
return self._content_type
|
||||
|
||||
def read(self) -> bytes:
|
||||
"""Return the bytes content."""
|
||||
@@ -246,18 +282,27 @@ class FileStream(BaseModel):
|
||||
stream: ValidatedBinaryIO = Field(description="Binary file stream.")
|
||||
filename: str | None = Field(default=None, description="Optional filename.")
|
||||
_content: bytes | None = PrivateAttr(default=None)
|
||||
_content_type: str = PrivateAttr()
|
||||
|
||||
def model_post_init(self, __context: object) -> None:
|
||||
"""Extract filename from stream if not provided."""
|
||||
@model_validator(mode="after")
|
||||
def _initialize(self) -> FileStream:
|
||||
"""Extract filename and detect content type."""
|
||||
if self.filename is None:
|
||||
name = getattr(self.stream, "name", None)
|
||||
if name is not None:
|
||||
object.__setattr__(self, "filename", Path(name).name)
|
||||
self.filename = Path(name).name
|
||||
|
||||
position = self.stream.tell()
|
||||
self.stream.seek(0)
|
||||
header = self.stream.read(MAGIC_BUFFER_SIZE)
|
||||
self.stream.seek(position)
|
||||
self._content_type = detect_content_type(header, self.filename)
|
||||
return self
|
||||
|
||||
@property
|
||||
def content_type(self) -> str:
|
||||
"""Get the content type from stream content."""
|
||||
return detect_content_type(self.read(), self.filename)
|
||||
"""Get the content type."""
|
||||
return self._content_type
|
||||
|
||||
def read(self) -> bytes:
|
||||
"""Read the stream content. Content is cached after first read."""
|
||||
@@ -319,13 +364,16 @@ class AsyncFileStream(BaseModel):
|
||||
)
|
||||
filename: str | None = Field(default=None, description="Optional filename.")
|
||||
_content: bytes | None = PrivateAttr(default=None)
|
||||
_content_type: str | None = PrivateAttr(default=None)
|
||||
|
||||
@property
|
||||
def content_type(self) -> str:
|
||||
"""Get the content type from stream content. Requires aread() first."""
|
||||
"""Get the content type from stream content (cached). Requires aread() first."""
|
||||
if self._content is None:
|
||||
raise RuntimeError("Call aread() first to load content")
|
||||
return detect_content_type(self._content, self.filename)
|
||||
if self._content_type is None:
|
||||
self._content_type = detect_content_type(self._content, self.filename)
|
||||
return self._content_type
|
||||
|
||||
async def aread(self) -> bytes:
|
||||
"""Async read the stream content. Content is cached after first read."""
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Provider-specific file constraints for multimodal content."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from functools import lru_cache
|
||||
from typing import Literal
|
||||
|
||||
from crewai.files.content_types import (
|
||||
@@ -162,47 +163,47 @@ class ProviderConstraints:
|
||||
ANTHROPIC_CONSTRAINTS = ProviderConstraints(
|
||||
name="anthropic",
|
||||
image=ImageConstraints(
|
||||
max_size_bytes=5 * 1024 * 1024,
|
||||
max_size_bytes=5_242_880,
|
||||
max_width=8000,
|
||||
max_height=8000,
|
||||
),
|
||||
pdf=PDFConstraints(
|
||||
max_size_bytes=30 * 1024 * 1024,
|
||||
max_size_bytes=31_457_280,
|
||||
max_pages=100,
|
||||
),
|
||||
supports_file_upload=True,
|
||||
file_upload_threshold_bytes=5 * 1024 * 1024,
|
||||
file_upload_threshold_bytes=5_242_880,
|
||||
)
|
||||
|
||||
OPENAI_CONSTRAINTS = ProviderConstraints(
|
||||
name="openai",
|
||||
image=ImageConstraints(
|
||||
max_size_bytes=20 * 1024 * 1024,
|
||||
max_size_bytes=20_971_520,
|
||||
max_images_per_request=10,
|
||||
),
|
||||
supports_file_upload=True,
|
||||
file_upload_threshold_bytes=5 * 1024 * 1024,
|
||||
file_upload_threshold_bytes=5_242_880,
|
||||
)
|
||||
|
||||
GEMINI_CONSTRAINTS = ProviderConstraints(
|
||||
name="gemini",
|
||||
image=ImageConstraints(
|
||||
max_size_bytes=100 * 1024 * 1024,
|
||||
max_size_bytes=104_857_600,
|
||||
supported_formats=GEMINI_IMAGE_FORMATS,
|
||||
),
|
||||
pdf=PDFConstraints(
|
||||
max_size_bytes=50 * 1024 * 1024,
|
||||
max_size_bytes=52_428_800,
|
||||
),
|
||||
audio=AudioConstraints(
|
||||
max_size_bytes=100 * 1024 * 1024,
|
||||
max_size_bytes=104_857_600,
|
||||
supported_formats=GEMINI_AUDIO_FORMATS,
|
||||
),
|
||||
video=VideoConstraints(
|
||||
max_size_bytes=2 * 1024 * 1024 * 1024,
|
||||
max_size_bytes=2_147_483_648,
|
||||
supported_formats=GEMINI_VIDEO_FORMATS,
|
||||
),
|
||||
supports_file_upload=True,
|
||||
file_upload_threshold_bytes=20 * 1024 * 1024,
|
||||
file_upload_threshold_bytes=20_971_520,
|
||||
)
|
||||
|
||||
BEDROCK_CONSTRAINTS = ProviderConstraints(
|
||||
@@ -221,7 +222,7 @@ BEDROCK_CONSTRAINTS = ProviderConstraints(
|
||||
AZURE_CONSTRAINTS = ProviderConstraints(
|
||||
name="azure",
|
||||
image=ImageConstraints(
|
||||
max_size_bytes=20 * 1024 * 1024,
|
||||
max_size_bytes=20_971_520,
|
||||
max_images_per_request=10,
|
||||
),
|
||||
)
|
||||
@@ -240,6 +241,7 @@ _PROVIDER_CONSTRAINTS_MAP: dict[str, ProviderConstraints] = {
|
||||
}
|
||||
|
||||
|
||||
@lru_cache(maxsize=32)
|
||||
def get_constraints_for_provider(
|
||||
provider: str | ProviderConstraints,
|
||||
) -> ProviderConstraints | None:
|
||||
|
||||
@@ -6,6 +6,7 @@ from dataclasses import dataclass, field
|
||||
import hashlib
|
||||
import logging
|
||||
|
||||
from crewai.files.constants import UPLOAD_MAX_RETRIES, UPLOAD_RETRY_DELAY_BASE
|
||||
from crewai.files.content_types import FileInput
|
||||
from crewai.files.metrics import measure_operation
|
||||
from crewai.files.processing.constraints import (
|
||||
@@ -29,9 +30,6 @@ from crewai.files.uploaders.base import FileUploader
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
UPLOAD_MAX_RETRIES = 3
|
||||
UPLOAD_RETRY_DELAY_BASE = 2
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileContext:
|
||||
|
||||
@@ -15,15 +15,14 @@ from typing import TYPE_CHECKING, Any
|
||||
from aiocache import Cache # type: ignore[import-untyped]
|
||||
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
|
||||
|
||||
from crewai.files.constants import DEFAULT_MAX_CACHE_ENTRIES, DEFAULT_TTL_SECONDS
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.files.content_types import FileInput
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_TTL_SECONDS = 24 * 60 * 60 # 24 hours
|
||||
DEFAULT_MAX_CACHE_ENTRIES = 1000
|
||||
|
||||
|
||||
@dataclass
|
||||
class CachedUpload:
|
||||
|
||||
@@ -1,84 +1,11 @@
|
||||
"""File uploader implementations for provider File APIs."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from crewai.files.uploaders.base import FileUploader, UploadResult
|
||||
from crewai.files.uploaders.factory import get_uploader
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__all__ = [
|
||||
"FileUploader",
|
||||
"UploadResult",
|
||||
"get_uploader",
|
||||
]
|
||||
|
||||
|
||||
def get_uploader(provider: str, **kwargs: Any) -> FileUploader | None:
|
||||
"""Get a file uploader for a specific provider.
|
||||
|
||||
Args:
|
||||
provider: Provider name (e.g., "gemini", "anthropic").
|
||||
**kwargs: Additional arguments passed to the uploader constructor.
|
||||
|
||||
Returns:
|
||||
FileUploader instance for the provider, or None if not supported.
|
||||
"""
|
||||
provider_lower = provider.lower()
|
||||
|
||||
if "gemini" in provider_lower or "google" in provider_lower:
|
||||
try:
|
||||
from crewai.files.uploaders.gemini import GeminiFileUploader
|
||||
|
||||
return GeminiFileUploader(**kwargs)
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"google-genai not installed. Install with: pip install google-genai"
|
||||
)
|
||||
return None
|
||||
|
||||
if "anthropic" in provider_lower or "claude" in provider_lower:
|
||||
try:
|
||||
from crewai.files.uploaders.anthropic import AnthropicFileUploader
|
||||
|
||||
return AnthropicFileUploader(**kwargs)
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"anthropic not installed. Install with: pip install anthropic"
|
||||
)
|
||||
return None
|
||||
|
||||
if "openai" in provider_lower or "gpt" in provider_lower:
|
||||
try:
|
||||
from crewai.files.uploaders.openai import OpenAIFileUploader
|
||||
|
||||
return OpenAIFileUploader(**kwargs)
|
||||
except ImportError:
|
||||
logger.warning("openai not installed. Install with: pip install openai")
|
||||
return None
|
||||
|
||||
if "bedrock" in provider_lower or "aws" in provider_lower:
|
||||
import os
|
||||
|
||||
if (
|
||||
not os.environ.get("CREWAI_BEDROCK_S3_BUCKET")
|
||||
and "bucket_name" not in kwargs
|
||||
):
|
||||
logger.debug(
|
||||
"Bedrock S3 uploader not configured. "
|
||||
"Set CREWAI_BEDROCK_S3_BUCKET environment variable to enable."
|
||||
)
|
||||
return None
|
||||
try:
|
||||
from crewai.files.uploaders.bedrock import BedrockFileUploader
|
||||
|
||||
return BedrockFileUploader(**kwargs)
|
||||
except ImportError:
|
||||
logger.warning("boto3 not installed. Install with: pip install boto3")
|
||||
return None
|
||||
|
||||
logger.debug(f"No file uploader available for provider: {provider}")
|
||||
return None
|
||||
|
||||
@@ -8,6 +8,11 @@ import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from crewai.files.constants import (
|
||||
MAX_CONCURRENCY,
|
||||
MULTIPART_CHUNKSIZE,
|
||||
MULTIPART_THRESHOLD,
|
||||
)
|
||||
from crewai.files.content_types import FileInput
|
||||
from crewai.files.file import FileBytes, FilePath
|
||||
from crewai.files.processing.exceptions import (
|
||||
@@ -19,10 +24,6 @@ from crewai.files.uploaders.base import FileUploader, UploadResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MULTIPART_THRESHOLD = 8 * 1024 * 1024
|
||||
MULTIPART_CHUNKSIZE = 8 * 1024 * 1024
|
||||
MAX_CONCURRENCY = 10
|
||||
|
||||
|
||||
def _classify_s3_error(e: Exception, filename: str | None) -> Exception:
|
||||
"""Classify an S3 exception as transient or permanent upload error.
|
||||
|
||||
161
lib/crewai/src/crewai/files/uploaders/factory.py
Normal file
161
lib/crewai/src/crewai/files/uploaders/factory.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""Factory for creating file uploaders."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Literal, TypedDict, overload, reveal_type
|
||||
|
||||
from typing_extensions import Unpack
|
||||
|
||||
from crewai.files.uploaders.base import FileUploader
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
ProviderType = Literal[
|
||||
"gemini", "google", "anthropic", "claude", "openai", "gpt", "bedrock", "aws"
|
||||
]
|
||||
UnknownProvider = str
|
||||
|
||||
|
||||
class AllOptions(TypedDict):
|
||||
"""Kwargs for uploader factory."""
|
||||
|
||||
api_key: str | None
|
||||
chunk_size: int
|
||||
bucket_name: str
|
||||
bucket_owner: str
|
||||
prefix: str
|
||||
region: str
|
||||
|
||||
|
||||
@overload
|
||||
def get_uploader(provider: UnknownProvider, /) -> None:
|
||||
"""Get file uploader for unknown provider."""
|
||||
|
||||
|
||||
@overload
|
||||
def get_uploader(
|
||||
provider: Literal["gemini", "google"],
|
||||
*,
|
||||
api_key: str | None = ...,
|
||||
) -> FileUploader:
|
||||
"""Get Gemini file uploader."""
|
||||
|
||||
|
||||
@overload
|
||||
def get_uploader(
|
||||
provider: Literal["anthropic", "claude"],
|
||||
*,
|
||||
api_key: str | None = ...,
|
||||
) -> FileUploader:
|
||||
"""Get Anthropic file uploader."""
|
||||
|
||||
|
||||
@overload
|
||||
def get_uploader(
|
||||
provider: Literal["openai", "gpt"],
|
||||
*,
|
||||
api_key: str | None = ...,
|
||||
chunk_size: int = ...,
|
||||
) -> FileUploader | None:
|
||||
"""Get OpenAI file uploader."""
|
||||
|
||||
|
||||
@overload
|
||||
def get_uploader(
|
||||
provider: Literal["bedrock", "aws"],
|
||||
*,
|
||||
bucket_name: str | None = ...,
|
||||
bucket_owner: str | None = ...,
|
||||
prefix: str = ...,
|
||||
region: str | None = ...,
|
||||
) -> FileUploader | None:
|
||||
"""Get Bedrock file uploader."""
|
||||
|
||||
|
||||
@overload
|
||||
def get_uploader(
|
||||
provider: ProviderType | UnknownProvider, **kwargs: Unpack[AllOptions]
|
||||
) -> FileUploader | None:
|
||||
"""Get any file uploader."""
|
||||
|
||||
|
||||
def get_uploader(provider, **kwargs): # type: ignore[no-untyped-def]
|
||||
"""Get a file uploader for a specific provider.
|
||||
|
||||
Args:
|
||||
provider: Provider name (e.g., "gemini", "anthropic").
|
||||
**kwargs: Additional arguments passed to the uploader constructor.
|
||||
|
||||
Returns:
|
||||
FileUploader instance for the provider, or None if not supported.
|
||||
"""
|
||||
provider_lower = provider.lower()
|
||||
|
||||
if "gemini" in provider_lower or "google" in provider_lower:
|
||||
try:
|
||||
from crewai.files.uploaders.gemini import GeminiFileUploader
|
||||
|
||||
return GeminiFileUploader(api_key=kwargs.get("api_key"))
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"google-genai not installed. Install with: pip install google-genai"
|
||||
)
|
||||
return None
|
||||
|
||||
if "anthropic" in provider_lower or "claude" in provider_lower:
|
||||
try:
|
||||
from crewai.files.uploaders.anthropic import AnthropicFileUploader
|
||||
|
||||
return AnthropicFileUploader(api_key=kwargs.get("api_key"))
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"anthropic not installed. Install with: pip install anthropic"
|
||||
)
|
||||
return None
|
||||
|
||||
if "openai" in provider_lower or "gpt" in provider_lower:
|
||||
try:
|
||||
from crewai.files.uploaders.openai import OpenAIFileUploader
|
||||
|
||||
return OpenAIFileUploader(
|
||||
api_key=kwargs.get("api_key"),
|
||||
chunk_size=kwargs.get("chunk_size", 67_108_864),
|
||||
)
|
||||
except ImportError:
|
||||
logger.warning("openai not installed. Install with: pip install openai")
|
||||
return None
|
||||
|
||||
if "bedrock" in provider_lower or "aws" in provider_lower:
|
||||
import os
|
||||
|
||||
if (
|
||||
not os.environ.get("CREWAI_BEDROCK_S3_BUCKET")
|
||||
and "bucket_name" not in kwargs
|
||||
):
|
||||
logger.debug(
|
||||
"Bedrock S3 uploader not configured. "
|
||||
"Set CREWAI_BEDROCK_S3_BUCKET environment variable to enable."
|
||||
)
|
||||
return None
|
||||
try:
|
||||
from crewai.files.uploaders.bedrock import BedrockFileUploader
|
||||
|
||||
return BedrockFileUploader(
|
||||
bucket_name=kwargs.get("bucket_name"),
|
||||
bucket_owner=kwargs.get("bucket_owner"),
|
||||
prefix=kwargs.get("prefix", "crewai-files"),
|
||||
region=kwargs.get("region"),
|
||||
)
|
||||
except ImportError:
|
||||
logger.warning("boto3 not installed. Install with: pip install boto3")
|
||||
return None
|
||||
|
||||
logger.debug(f"No file uploader available for provider: {provider}")
|
||||
return None
|
||||
|
||||
|
||||
t = get_uploader("openai")
|
||||
reveal_type(t)
|
||||
@@ -3,7 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from datetime import datetime, timezone
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
@@ -12,6 +12,12 @@ import random
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from crewai.files.constants import (
|
||||
BACKOFF_BASE_DELAY,
|
||||
BACKOFF_JITTER_FACTOR,
|
||||
BACKOFF_MAX_DELAY,
|
||||
GEMINI_FILE_TTL,
|
||||
)
|
||||
from crewai.files.content_types import FileInput
|
||||
from crewai.files.file import FilePath
|
||||
from crewai.files.processing.exceptions import (
|
||||
@@ -24,12 +30,6 @@ from crewai.files.uploaders.base import FileUploader, UploadResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
GEMINI_FILE_TTL = timedelta(hours=48)
|
||||
|
||||
BACKOFF_BASE_DELAY = 1.0
|
||||
BACKOFF_MAX_DELAY = 30.0
|
||||
BACKOFF_JITTER_FACTOR = 0.1
|
||||
|
||||
|
||||
def _compute_backoff_delay(attempt: int) -> float:
|
||||
"""Compute exponential backoff delay with jitter.
|
||||
|
||||
@@ -8,6 +8,7 @@ import logging
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
from crewai.files.constants import DEFAULT_UPLOAD_CHUNK_SIZE, FILES_API_MAX_SIZE
|
||||
from crewai.files.content_types import FileInput
|
||||
from crewai.files.file import FileBytes, FilePath, FileStream
|
||||
from crewai.files.processing.exceptions import (
|
||||
@@ -20,9 +21,6 @@ from crewai.files.uploaders.base import FileUploader, UploadResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
FILES_API_MAX_SIZE = 512 * 1024 * 1024
|
||||
DEFAULT_UPLOAD_CHUNK_SIZE = 64 * 1024 * 1024
|
||||
|
||||
|
||||
def _get_file_size(file: FileInput) -> int | None:
|
||||
"""Get file size without reading content if possible.
|
||||
|
||||
92
lib/crewai/src/crewai/files/utils.py
Normal file
92
lib/crewai/src/crewai/files/utils.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""Utility functions for file handling."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from typing_extensions import TypeIs
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.files.content_types import FileInput
|
||||
from crewai.files.file import FileSource, FileSourceInput
|
||||
|
||||
|
||||
def is_file_source(v: object) -> TypeIs[FileSource]:
|
||||
"""Type guard to narrow input to FileSource."""
|
||||
from crewai.files.file import FileBytes, FilePath, FileStream
|
||||
|
||||
return isinstance(v, (FilePath, FileBytes, FileStream))
|
||||
|
||||
|
||||
def wrap_file_source(source: FileSource) -> FileInput:
|
||||
"""Wrap a FileSource in the appropriate typed FileInput wrapper.
|
||||
|
||||
Args:
|
||||
source: The file source to wrap.
|
||||
|
||||
Returns:
|
||||
Typed FileInput wrapper based on content type.
|
||||
"""
|
||||
from crewai.files.content_types import (
|
||||
AudioFile,
|
||||
ImageFile,
|
||||
PDFFile,
|
||||
TextFile,
|
||||
VideoFile,
|
||||
)
|
||||
|
||||
content_type = source.content_type
|
||||
|
||||
if content_type.startswith("image/"):
|
||||
return ImageFile(source=source)
|
||||
if content_type.startswith("audio/"):
|
||||
return AudioFile(source=source)
|
||||
if content_type.startswith("video/"):
|
||||
return VideoFile(source=source)
|
||||
if content_type == "application/pdf":
|
||||
return PDFFile(source=source)
|
||||
return TextFile(source=source)
|
||||
|
||||
|
||||
def normalize_input_files(
|
||||
input_files: list[FileSourceInput | FileInput],
|
||||
) -> dict[str, FileInput]:
|
||||
"""Convert a list of file sources to a named dictionary of FileInputs.
|
||||
|
||||
Args:
|
||||
input_files: List of file source inputs or File objects.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping names to FileInput wrappers.
|
||||
"""
|
||||
from crewai.files.content_types import BaseFile
|
||||
from crewai.files.file import FileBytes, FilePath, FileStream
|
||||
|
||||
result: dict[str, FileInput] = {}
|
||||
|
||||
for i, item in enumerate(input_files):
|
||||
if isinstance(item, BaseFile):
|
||||
name = item.filename or f"file_{i}"
|
||||
if "." in name:
|
||||
name = name.rsplit(".", 1)[0]
|
||||
result[name] = item
|
||||
continue
|
||||
|
||||
file_source: FilePath | FileBytes | FileStream
|
||||
if isinstance(item, (FilePath, FileBytes, FileStream)):
|
||||
file_source = item
|
||||
elif isinstance(item, Path):
|
||||
file_source = FilePath(path=item)
|
||||
elif isinstance(item, str):
|
||||
file_source = FilePath(path=Path(item))
|
||||
elif isinstance(item, (bytes, memoryview)):
|
||||
file_source = FileBytes(data=bytes(item))
|
||||
else:
|
||||
continue
|
||||
|
||||
name = file_source.filename or f"file_{i}"
|
||||
result[name] = wrap_file_source(file_source)
|
||||
|
||||
return result
|
||||
Reference in New Issue
Block a user