diff --git a/lib/crewai/src/crewai/files/__init__.py b/lib/crewai/src/crewai/files/__init__.py index d86767871..d33c10894 100644 --- a/lib/crewai/src/crewai/files/__init__.py +++ b/lib/crewai/src/crewai/files/__init__.py @@ -73,69 +73,7 @@ from crewai.files.upload_cache import ( reset_upload_cache, ) from crewai.files.uploaders import FileUploader, UploadResult, get_uploader - - -def wrap_file_source(source: FileSource) -> FileInput: - """Wrap a FileSource in the appropriate typed FileInput wrapper. - - Args: - source: The file source to wrap. - - Returns: - Typed FileInput wrapper based on content type. - """ - content_type = source.content_type - - if content_type.startswith("image/"): - return ImageFile(source=source) - if content_type.startswith("audio/"): - return AudioFile(source=source) - if content_type.startswith("video/"): - return VideoFile(source=source) - if content_type == "application/pdf": - return PDFFile(source=source) - return TextFile(source=source) - - -def normalize_input_files( - input_files: list[FileSourceInput | FileInput], -) -> dict[str, FileInput]: - """Convert a list of file sources to a named dictionary of FileInputs. - - Args: - input_files: List of file source inputs or File objects. - - Returns: - Dictionary mapping names to FileInput wrappers. - """ - from pathlib import Path - - result: dict[str, FileInput] = {} - - for i, item in enumerate(input_files): - if isinstance(item, BaseFile): - name = item.filename or f"file_{i}" - if "." in name: - name = name.rsplit(".", 1)[0] - result[name] = item - continue - - file_source: FilePath | FileBytes | FileStream - if isinstance(item, (FilePath, FileBytes, FileStream)): - file_source = item - elif isinstance(item, Path): - file_source = FilePath(path=item) - elif isinstance(item, str): - file_source = FilePath(path=Path(item)) - elif isinstance(item, (bytes, memoryview)): - file_source = FileBytes(data=bytes(item)) - else: - continue - - name = file_source.filename or f"file_{i}" - result[name] = wrap_file_source(file_source) - - return result +from crewai.files.utils import normalize_input_files, wrap_file_source __all__ = [ diff --git a/lib/crewai/src/crewai/files/constants.py b/lib/crewai/src/crewai/files/constants.py new file mode 100644 index 000000000..9d174daf1 --- /dev/null +++ b/lib/crewai/src/crewai/files/constants.py @@ -0,0 +1,26 @@ +"""Constants for file handling utilities.""" + +from datetime import timedelta +from typing import Final, Literal + + +DEFAULT_MAX_FILE_SIZE_BYTES: Final[Literal[524_288_000]] = 524_288_000 +MAGIC_BUFFER_SIZE: Final[Literal[2048]] = 2048 + +UPLOAD_MAX_RETRIES: Final[Literal[3]] = 3 +UPLOAD_RETRY_DELAY_BASE: Final[Literal[2]] = 2 + +DEFAULT_TTL_SECONDS: Final[Literal[86_400]] = 86_400 +DEFAULT_MAX_CACHE_ENTRIES: Final[Literal[1000]] = 1000 + +GEMINI_FILE_TTL: Final[timedelta] = timedelta(hours=48) +BACKOFF_BASE_DELAY: Final[float] = 1.0 +BACKOFF_MAX_DELAY: Final[float] = 30.0 +BACKOFF_JITTER_FACTOR: Final[float] = 0.1 + +FILES_API_MAX_SIZE: Final[Literal[536_870_912]] = 536_870_912 +DEFAULT_UPLOAD_CHUNK_SIZE: Final[Literal[67_108_864]] = 67_108_864 + +MULTIPART_THRESHOLD: Final[Literal[8_388_608]] = 8_388_608 +MULTIPART_CHUNKSIZE: Final[Literal[8_388_608]] = 8_388_608 +MAX_CONCURRENCY: Final[Literal[10]] = 10 diff --git a/lib/crewai/src/crewai/files/content_types.py b/lib/crewai/src/crewai/files/content_types.py index ae5a96d97..f110f58bb 100644 --- a/lib/crewai/src/crewai/files/content_types.py +++ b/lib/crewai/src/crewai/files/content_types.py @@ -9,7 +9,6 @@ from typing import Annotated, Any, BinaryIO, Literal, Self from pydantic import BaseModel, Field, GetCoreSchemaHandler from pydantic_core import CoreSchema, core_schema -from typing_extensions import TypeIs from crewai.files.file import ( AsyncFileStream, @@ -18,6 +17,7 @@ from crewai.files.file import ( FileSource, FileStream, ) +from crewai.files.utils import is_file_source FileSourceInput = str | Path | bytes | IOBase | FileSource @@ -60,12 +60,6 @@ class _FileSourceCoercer: CoercedFileSource = Annotated[FileSourceInput, _FileSourceCoercer] - -def _is_file_source(v: FileSourceInput) -> TypeIs[FileSource]: - """Type guard to narrow FileSourceInput to FileSource.""" - return isinstance(v, (FilePath, FileBytes, FileStream)) - - FileMode = Literal["strict", "auto", "warn", "chunk"] @@ -184,7 +178,7 @@ class BaseFile(ABC, BaseModel): @property def _file_source(self) -> FileSource: """Get source with narrowed type (always FileSource after validation).""" - if _is_file_source(self.source): + if is_file_source(self.source): return self.source raise TypeError("source must be a FileSource after validation") diff --git a/lib/crewai/src/crewai/files/file.py b/lib/crewai/src/crewai/files/file.py index 1bd370671..53d9a08f1 100644 --- a/lib/crewai/src/crewai/files/file.py +++ b/lib/crewai/src/crewai/files/file.py @@ -18,6 +18,8 @@ from pydantic import ( ) from pydantic_core import CoreSchema, core_schema +from crewai.files.constants import DEFAULT_MAX_FILE_SIZE_BYTES, MAGIC_BUFFER_SIZE + @runtime_checkable class AsyncReadable(Protocol): @@ -51,7 +53,14 @@ class _AsyncReadableValidator: ValidatedAsyncReadable = Annotated[AsyncReadable, _AsyncReadableValidator()] -DEFAULT_MAX_FILE_SIZE_BYTES = 500 * 1024 * 1024 # 500MB + +def _fallback_content_type(filename: str | None) -> str: + """Get content type from filename extension or return default.""" + if filename: + mime_type, _ = mimetypes.guess_type(filename) + if mime_type: + return mime_type + return "application/octet-stream" def detect_content_type(data: bytes, filename: str | None = None) -> str: @@ -61,7 +70,7 @@ def detect_content_type(data: bytes, filename: str | None = None) -> str: falls back to mimetypes module using filename extension. Args: - data: Raw bytes to analyze. + data: Raw bytes to analyze (only first 2048 bytes are used). filename: Optional filename for extension-based fallback. Returns: @@ -70,14 +79,32 @@ def detect_content_type(data: bytes, filename: str | None = None) -> str: try: import magic - result: str = magic.from_buffer(data, mime=True) + result: str = magic.from_buffer(data[:MAGIC_BUFFER_SIZE], mime=True) return result except ImportError: - if filename: - mime_type, _ = mimetypes.guess_type(filename) - if mime_type: - return mime_type - return "application/octet-stream" + return _fallback_content_type(filename) + + +def detect_content_type_from_path(path: Path, filename: str | None = None) -> str: + """Detect MIME type from file path. + + Uses python-magic's from_file() for accurate detection without reading + the entire file into memory. + + Args: + path: Path to the file. + filename: Optional filename for extension-based fallback. + + Returns: + The detected MIME type. + """ + try: + import magic + + result: str = magic.from_file(str(path), mime=True) + return result + except ImportError: + return _fallback_content_type(filename or path.name) class _BinaryIOValidator: @@ -114,6 +141,7 @@ class FilePath(BaseModel): description="Maximum file size in bytes.", ) _content: bytes | None = PrivateAttr(default=None) + _content_type: str = PrivateAttr() @model_validator(mode="after") def _validate_file_exists(self) -> FilePath: @@ -144,6 +172,7 @@ class FilePath(BaseModel): max_size=self.max_size_bytes, ) + self._content_type = detect_content_type_from_path(self.path, self.path.name) return self @property @@ -153,8 +182,8 @@ class FilePath(BaseModel): @property def content_type(self) -> str: - """Get the content type by reading file content.""" - return detect_content_type(self.read(), self.filename) + """Get the content type.""" + return self._content_type def read(self) -> bytes: """Read the file content from disk.""" @@ -201,11 +230,18 @@ class FileBytes(BaseModel): data: bytes = Field(description="Raw bytes content of the file.") filename: str | None = Field(default=None, description="Optional filename.") + _content_type: str = PrivateAttr() + + @model_validator(mode="after") + def _detect_content_type(self) -> FileBytes: + """Detect and cache content type from data.""" + self._content_type = detect_content_type(self.data, self.filename) + return self @property def content_type(self) -> str: - """Get the content type from the data.""" - return detect_content_type(self.data, self.filename) + """Get the content type.""" + return self._content_type def read(self) -> bytes: """Return the bytes content.""" @@ -246,18 +282,27 @@ class FileStream(BaseModel): stream: ValidatedBinaryIO = Field(description="Binary file stream.") filename: str | None = Field(default=None, description="Optional filename.") _content: bytes | None = PrivateAttr(default=None) + _content_type: str = PrivateAttr() - def model_post_init(self, __context: object) -> None: - """Extract filename from stream if not provided.""" + @model_validator(mode="after") + def _initialize(self) -> FileStream: + """Extract filename and detect content type.""" if self.filename is None: name = getattr(self.stream, "name", None) if name is not None: - object.__setattr__(self, "filename", Path(name).name) + self.filename = Path(name).name + + position = self.stream.tell() + self.stream.seek(0) + header = self.stream.read(MAGIC_BUFFER_SIZE) + self.stream.seek(position) + self._content_type = detect_content_type(header, self.filename) + return self @property def content_type(self) -> str: - """Get the content type from stream content.""" - return detect_content_type(self.read(), self.filename) + """Get the content type.""" + return self._content_type def read(self) -> bytes: """Read the stream content. Content is cached after first read.""" @@ -319,13 +364,16 @@ class AsyncFileStream(BaseModel): ) filename: str | None = Field(default=None, description="Optional filename.") _content: bytes | None = PrivateAttr(default=None) + _content_type: str | None = PrivateAttr(default=None) @property def content_type(self) -> str: - """Get the content type from stream content. Requires aread() first.""" + """Get the content type from stream content (cached). Requires aread() first.""" if self._content is None: raise RuntimeError("Call aread() first to load content") - return detect_content_type(self._content, self.filename) + if self._content_type is None: + self._content_type = detect_content_type(self._content, self.filename) + return self._content_type async def aread(self) -> bytes: """Async read the stream content. Content is cached after first read.""" diff --git a/lib/crewai/src/crewai/files/processing/constraints.py b/lib/crewai/src/crewai/files/processing/constraints.py index adf7cca86..02019e8c1 100644 --- a/lib/crewai/src/crewai/files/processing/constraints.py +++ b/lib/crewai/src/crewai/files/processing/constraints.py @@ -1,6 +1,7 @@ """Provider-specific file constraints for multimodal content.""" from dataclasses import dataclass +from functools import lru_cache from typing import Literal from crewai.files.content_types import ( @@ -162,47 +163,47 @@ class ProviderConstraints: ANTHROPIC_CONSTRAINTS = ProviderConstraints( name="anthropic", image=ImageConstraints( - max_size_bytes=5 * 1024 * 1024, + max_size_bytes=5_242_880, max_width=8000, max_height=8000, ), pdf=PDFConstraints( - max_size_bytes=30 * 1024 * 1024, + max_size_bytes=31_457_280, max_pages=100, ), supports_file_upload=True, - file_upload_threshold_bytes=5 * 1024 * 1024, + file_upload_threshold_bytes=5_242_880, ) OPENAI_CONSTRAINTS = ProviderConstraints( name="openai", image=ImageConstraints( - max_size_bytes=20 * 1024 * 1024, + max_size_bytes=20_971_520, max_images_per_request=10, ), supports_file_upload=True, - file_upload_threshold_bytes=5 * 1024 * 1024, + file_upload_threshold_bytes=5_242_880, ) GEMINI_CONSTRAINTS = ProviderConstraints( name="gemini", image=ImageConstraints( - max_size_bytes=100 * 1024 * 1024, + max_size_bytes=104_857_600, supported_formats=GEMINI_IMAGE_FORMATS, ), pdf=PDFConstraints( - max_size_bytes=50 * 1024 * 1024, + max_size_bytes=52_428_800, ), audio=AudioConstraints( - max_size_bytes=100 * 1024 * 1024, + max_size_bytes=104_857_600, supported_formats=GEMINI_AUDIO_FORMATS, ), video=VideoConstraints( - max_size_bytes=2 * 1024 * 1024 * 1024, + max_size_bytes=2_147_483_648, supported_formats=GEMINI_VIDEO_FORMATS, ), supports_file_upload=True, - file_upload_threshold_bytes=20 * 1024 * 1024, + file_upload_threshold_bytes=20_971_520, ) BEDROCK_CONSTRAINTS = ProviderConstraints( @@ -221,7 +222,7 @@ BEDROCK_CONSTRAINTS = ProviderConstraints( AZURE_CONSTRAINTS = ProviderConstraints( name="azure", image=ImageConstraints( - max_size_bytes=20 * 1024 * 1024, + max_size_bytes=20_971_520, max_images_per_request=10, ), ) @@ -240,6 +241,7 @@ _PROVIDER_CONSTRAINTS_MAP: dict[str, ProviderConstraints] = { } +@lru_cache(maxsize=32) def get_constraints_for_provider( provider: str | ProviderConstraints, ) -> ProviderConstraints | None: diff --git a/lib/crewai/src/crewai/files/resolver.py b/lib/crewai/src/crewai/files/resolver.py index 75464d289..138740334 100644 --- a/lib/crewai/src/crewai/files/resolver.py +++ b/lib/crewai/src/crewai/files/resolver.py @@ -6,6 +6,7 @@ from dataclasses import dataclass, field import hashlib import logging +from crewai.files.constants import UPLOAD_MAX_RETRIES, UPLOAD_RETRY_DELAY_BASE from crewai.files.content_types import FileInput from crewai.files.metrics import measure_operation from crewai.files.processing.constraints import ( @@ -29,9 +30,6 @@ from crewai.files.uploaders.base import FileUploader logger = logging.getLogger(__name__) -UPLOAD_MAX_RETRIES = 3 -UPLOAD_RETRY_DELAY_BASE = 2 - @dataclass class FileContext: diff --git a/lib/crewai/src/crewai/files/upload_cache.py b/lib/crewai/src/crewai/files/upload_cache.py index 799621e8d..8fab127da 100644 --- a/lib/crewai/src/crewai/files/upload_cache.py +++ b/lib/crewai/src/crewai/files/upload_cache.py @@ -15,15 +15,14 @@ from typing import TYPE_CHECKING, Any from aiocache import Cache # type: ignore[import-untyped] from aiocache.serializers import PickleSerializer # type: ignore[import-untyped] +from crewai.files.constants import DEFAULT_MAX_CACHE_ENTRIES, DEFAULT_TTL_SECONDS + if TYPE_CHECKING: from crewai.files.content_types import FileInput logger = logging.getLogger(__name__) -DEFAULT_TTL_SECONDS = 24 * 60 * 60 # 24 hours -DEFAULT_MAX_CACHE_ENTRIES = 1000 - @dataclass class CachedUpload: diff --git a/lib/crewai/src/crewai/files/uploaders/__init__.py b/lib/crewai/src/crewai/files/uploaders/__init__.py index a091eb1b5..d3664e71a 100644 --- a/lib/crewai/src/crewai/files/uploaders/__init__.py +++ b/lib/crewai/src/crewai/files/uploaders/__init__.py @@ -1,84 +1,11 @@ """File uploader implementations for provider File APIs.""" -from __future__ import annotations - -import logging -from typing import Any - from crewai.files.uploaders.base import FileUploader, UploadResult +from crewai.files.uploaders.factory import get_uploader -logger = logging.getLogger(__name__) - __all__ = [ "FileUploader", "UploadResult", "get_uploader", ] - - -def get_uploader(provider: str, **kwargs: Any) -> FileUploader | None: - """Get a file uploader for a specific provider. - - Args: - provider: Provider name (e.g., "gemini", "anthropic"). - **kwargs: Additional arguments passed to the uploader constructor. - - Returns: - FileUploader instance for the provider, or None if not supported. - """ - provider_lower = provider.lower() - - if "gemini" in provider_lower or "google" in provider_lower: - try: - from crewai.files.uploaders.gemini import GeminiFileUploader - - return GeminiFileUploader(**kwargs) - except ImportError: - logger.warning( - "google-genai not installed. Install with: pip install google-genai" - ) - return None - - if "anthropic" in provider_lower or "claude" in provider_lower: - try: - from crewai.files.uploaders.anthropic import AnthropicFileUploader - - return AnthropicFileUploader(**kwargs) - except ImportError: - logger.warning( - "anthropic not installed. Install with: pip install anthropic" - ) - return None - - if "openai" in provider_lower or "gpt" in provider_lower: - try: - from crewai.files.uploaders.openai import OpenAIFileUploader - - return OpenAIFileUploader(**kwargs) - except ImportError: - logger.warning("openai not installed. Install with: pip install openai") - return None - - if "bedrock" in provider_lower or "aws" in provider_lower: - import os - - if ( - not os.environ.get("CREWAI_BEDROCK_S3_BUCKET") - and "bucket_name" not in kwargs - ): - logger.debug( - "Bedrock S3 uploader not configured. " - "Set CREWAI_BEDROCK_S3_BUCKET environment variable to enable." - ) - return None - try: - from crewai.files.uploaders.bedrock import BedrockFileUploader - - return BedrockFileUploader(**kwargs) - except ImportError: - logger.warning("boto3 not installed. Install with: pip install boto3") - return None - - logger.debug(f"No file uploader available for provider: {provider}") - return None diff --git a/lib/crewai/src/crewai/files/uploaders/bedrock.py b/lib/crewai/src/crewai/files/uploaders/bedrock.py index f0c7f0cb7..c887cae89 100644 --- a/lib/crewai/src/crewai/files/uploaders/bedrock.py +++ b/lib/crewai/src/crewai/files/uploaders/bedrock.py @@ -8,6 +8,11 @@ import os from pathlib import Path from typing import Any +from crewai.files.constants import ( + MAX_CONCURRENCY, + MULTIPART_CHUNKSIZE, + MULTIPART_THRESHOLD, +) from crewai.files.content_types import FileInput from crewai.files.file import FileBytes, FilePath from crewai.files.processing.exceptions import ( @@ -19,10 +24,6 @@ from crewai.files.uploaders.base import FileUploader, UploadResult logger = logging.getLogger(__name__) -MULTIPART_THRESHOLD = 8 * 1024 * 1024 -MULTIPART_CHUNKSIZE = 8 * 1024 * 1024 -MAX_CONCURRENCY = 10 - def _classify_s3_error(e: Exception, filename: str | None) -> Exception: """Classify an S3 exception as transient or permanent upload error. diff --git a/lib/crewai/src/crewai/files/uploaders/factory.py b/lib/crewai/src/crewai/files/uploaders/factory.py new file mode 100644 index 000000000..9716c9fef --- /dev/null +++ b/lib/crewai/src/crewai/files/uploaders/factory.py @@ -0,0 +1,161 @@ +"""Factory for creating file uploaders.""" + +from __future__ import annotations + +import logging +from typing import Literal, TypedDict, overload, reveal_type + +from typing_extensions import Unpack + +from crewai.files.uploaders.base import FileUploader + + +logger = logging.getLogger(__name__) + + +ProviderType = Literal[ + "gemini", "google", "anthropic", "claude", "openai", "gpt", "bedrock", "aws" +] +UnknownProvider = str + + +class AllOptions(TypedDict): + """Kwargs for uploader factory.""" + + api_key: str | None + chunk_size: int + bucket_name: str + bucket_owner: str + prefix: str + region: str + + +@overload +def get_uploader(provider: UnknownProvider, /) -> None: + """Get file uploader for unknown provider.""" + + +@overload +def get_uploader( + provider: Literal["gemini", "google"], + *, + api_key: str | None = ..., +) -> FileUploader: + """Get Gemini file uploader.""" + + +@overload +def get_uploader( + provider: Literal["anthropic", "claude"], + *, + api_key: str | None = ..., +) -> FileUploader: + """Get Anthropic file uploader.""" + + +@overload +def get_uploader( + provider: Literal["openai", "gpt"], + *, + api_key: str | None = ..., + chunk_size: int = ..., +) -> FileUploader | None: + """Get OpenAI file uploader.""" + + +@overload +def get_uploader( + provider: Literal["bedrock", "aws"], + *, + bucket_name: str | None = ..., + bucket_owner: str | None = ..., + prefix: str = ..., + region: str | None = ..., +) -> FileUploader | None: + """Get Bedrock file uploader.""" + + +@overload +def get_uploader( + provider: ProviderType | UnknownProvider, **kwargs: Unpack[AllOptions] +) -> FileUploader | None: + """Get any file uploader.""" + + +def get_uploader(provider, **kwargs): # type: ignore[no-untyped-def] + """Get a file uploader for a specific provider. + + Args: + provider: Provider name (e.g., "gemini", "anthropic"). + **kwargs: Additional arguments passed to the uploader constructor. + + Returns: + FileUploader instance for the provider, or None if not supported. + """ + provider_lower = provider.lower() + + if "gemini" in provider_lower or "google" in provider_lower: + try: + from crewai.files.uploaders.gemini import GeminiFileUploader + + return GeminiFileUploader(api_key=kwargs.get("api_key")) + except ImportError: + logger.warning( + "google-genai not installed. Install with: pip install google-genai" + ) + return None + + if "anthropic" in provider_lower or "claude" in provider_lower: + try: + from crewai.files.uploaders.anthropic import AnthropicFileUploader + + return AnthropicFileUploader(api_key=kwargs.get("api_key")) + except ImportError: + logger.warning( + "anthropic not installed. Install with: pip install anthropic" + ) + return None + + if "openai" in provider_lower or "gpt" in provider_lower: + try: + from crewai.files.uploaders.openai import OpenAIFileUploader + + return OpenAIFileUploader( + api_key=kwargs.get("api_key"), + chunk_size=kwargs.get("chunk_size", 67_108_864), + ) + except ImportError: + logger.warning("openai not installed. Install with: pip install openai") + return None + + if "bedrock" in provider_lower or "aws" in provider_lower: + import os + + if ( + not os.environ.get("CREWAI_BEDROCK_S3_BUCKET") + and "bucket_name" not in kwargs + ): + logger.debug( + "Bedrock S3 uploader not configured. " + "Set CREWAI_BEDROCK_S3_BUCKET environment variable to enable." + ) + return None + try: + from crewai.files.uploaders.bedrock import BedrockFileUploader + + return BedrockFileUploader( + bucket_name=kwargs.get("bucket_name"), + bucket_owner=kwargs.get("bucket_owner"), + prefix=kwargs.get("prefix", "crewai-files"), + region=kwargs.get("region"), + ) + except ImportError: + logger.warning("boto3 not installed. Install with: pip install boto3") + return None + + logger.debug(f"No file uploader available for provider: {provider}") + return None + + +t = get_uploader("openai") +reveal_type(t) diff --git a/lib/crewai/src/crewai/files/uploaders/gemini.py b/lib/crewai/src/crewai/files/uploaders/gemini.py index ff43964ef..ba171692e 100644 --- a/lib/crewai/src/crewai/files/uploaders/gemini.py +++ b/lib/crewai/src/crewai/files/uploaders/gemini.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone import io import logging import os @@ -12,6 +12,12 @@ import random import time from typing import Any +from crewai.files.constants import ( + BACKOFF_BASE_DELAY, + BACKOFF_JITTER_FACTOR, + BACKOFF_MAX_DELAY, + GEMINI_FILE_TTL, +) from crewai.files.content_types import FileInput from crewai.files.file import FilePath from crewai.files.processing.exceptions import ( @@ -24,12 +30,6 @@ from crewai.files.uploaders.base import FileUploader, UploadResult logger = logging.getLogger(__name__) -GEMINI_FILE_TTL = timedelta(hours=48) - -BACKOFF_BASE_DELAY = 1.0 -BACKOFF_MAX_DELAY = 30.0 -BACKOFF_JITTER_FACTOR = 0.1 - def _compute_backoff_delay(attempt: int) -> float: """Compute exponential backoff delay with jitter. diff --git a/lib/crewai/src/crewai/files/uploaders/openai.py b/lib/crewai/src/crewai/files/uploaders/openai.py index d251094a4..f8893dc01 100644 --- a/lib/crewai/src/crewai/files/uploaders/openai.py +++ b/lib/crewai/src/crewai/files/uploaders/openai.py @@ -8,6 +8,7 @@ import logging import os from typing import Any +from crewai.files.constants import DEFAULT_UPLOAD_CHUNK_SIZE, FILES_API_MAX_SIZE from crewai.files.content_types import FileInput from crewai.files.file import FileBytes, FilePath, FileStream from crewai.files.processing.exceptions import ( @@ -20,9 +21,6 @@ from crewai.files.uploaders.base import FileUploader, UploadResult logger = logging.getLogger(__name__) -FILES_API_MAX_SIZE = 512 * 1024 * 1024 -DEFAULT_UPLOAD_CHUNK_SIZE = 64 * 1024 * 1024 - def _get_file_size(file: FileInput) -> int | None: """Get file size without reading content if possible. diff --git a/lib/crewai/src/crewai/files/utils.py b/lib/crewai/src/crewai/files/utils.py new file mode 100644 index 000000000..e8069bae4 --- /dev/null +++ b/lib/crewai/src/crewai/files/utils.py @@ -0,0 +1,92 @@ +"""Utility functions for file handling.""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +from typing_extensions import TypeIs + + +if TYPE_CHECKING: + from crewai.files.content_types import FileInput + from crewai.files.file import FileSource, FileSourceInput + + +def is_file_source(v: object) -> TypeIs[FileSource]: + """Type guard to narrow input to FileSource.""" + from crewai.files.file import FileBytes, FilePath, FileStream + + return isinstance(v, (FilePath, FileBytes, FileStream)) + + +def wrap_file_source(source: FileSource) -> FileInput: + """Wrap a FileSource in the appropriate typed FileInput wrapper. + + Args: + source: The file source to wrap. + + Returns: + Typed FileInput wrapper based on content type. + """ + from crewai.files.content_types import ( + AudioFile, + ImageFile, + PDFFile, + TextFile, + VideoFile, + ) + + content_type = source.content_type + + if content_type.startswith("image/"): + return ImageFile(source=source) + if content_type.startswith("audio/"): + return AudioFile(source=source) + if content_type.startswith("video/"): + return VideoFile(source=source) + if content_type == "application/pdf": + return PDFFile(source=source) + return TextFile(source=source) + + +def normalize_input_files( + input_files: list[FileSourceInput | FileInput], +) -> dict[str, FileInput]: + """Convert a list of file sources to a named dictionary of FileInputs. + + Args: + input_files: List of file source inputs or File objects. + + Returns: + Dictionary mapping names to FileInput wrappers. + """ + from crewai.files.content_types import BaseFile + from crewai.files.file import FileBytes, FilePath, FileStream + + result: dict[str, FileInput] = {} + + for i, item in enumerate(input_files): + if isinstance(item, BaseFile): + name = item.filename or f"file_{i}" + if "." in name: + name = name.rsplit(".", 1)[0] + result[name] = item + continue + + file_source: FilePath | FileBytes | FileStream + if isinstance(item, (FilePath, FileBytes, FileStream)): + file_source = item + elif isinstance(item, Path): + file_source = FilePath(path=item) + elif isinstance(item, str): + file_source = FilePath(path=Path(item)) + elif isinstance(item, (bytes, memoryview)): + file_source = FileBytes(data=bytes(item)) + else: + continue + + name = file_source.filename or f"file_{i}" + result[name] = wrap_file_source(file_source) + + return result