feat: add file processing infrastructure

This commit is contained in:
Greyson LaLonde
2026-01-21 18:30:14 -05:00
parent 22f1e21d69
commit 301a1da047
6 changed files with 1442 additions and 0 deletions

View File

@@ -0,0 +1,258 @@
"""Provider-specific file constraints for multimodal content."""
from dataclasses import dataclass
@dataclass(frozen=True)
class ImageConstraints:
"""Constraints for image files.
Attributes:
max_size_bytes: Maximum file size in bytes.
max_width: Maximum image width in pixels.
max_height: Maximum image height in pixels.
max_images_per_request: Maximum number of images per request.
supported_formats: Supported image MIME types.
"""
max_size_bytes: int
max_width: int | None = None
max_height: int | None = None
max_images_per_request: int | None = None
supported_formats: tuple[str, ...] = (
"image/png",
"image/jpeg",
"image/gif",
"image/webp",
)
@dataclass(frozen=True)
class PDFConstraints:
"""Constraints for PDF files.
Attributes:
max_size_bytes: Maximum file size in bytes.
max_pages: Maximum number of pages.
"""
max_size_bytes: int
max_pages: int | None = None
@dataclass(frozen=True)
class AudioConstraints:
"""Constraints for audio files.
Attributes:
max_size_bytes: Maximum file size in bytes.
max_duration_seconds: Maximum audio duration in seconds.
supported_formats: Supported audio MIME types.
"""
max_size_bytes: int
max_duration_seconds: int | None = None
supported_formats: tuple[str, ...] = (
"audio/mp3",
"audio/mpeg",
"audio/wav",
"audio/ogg",
"audio/flac",
"audio/aac",
"audio/m4a",
)
@dataclass(frozen=True)
class VideoConstraints:
"""Constraints for video files.
Attributes:
max_size_bytes: Maximum file size in bytes.
max_duration_seconds: Maximum video duration in seconds.
supported_formats: Supported video MIME types.
"""
max_size_bytes: int
max_duration_seconds: int | None = None
supported_formats: tuple[str, ...] = (
"video/mp4",
"video/mpeg",
"video/webm",
"video/quicktime",
)
@dataclass(frozen=True)
class ProviderConstraints:
"""Complete set of constraints for a provider.
Attributes:
name: Provider name identifier.
image: Image file constraints.
pdf: PDF file constraints.
audio: Audio file constraints.
video: Video file constraints.
general_max_size_bytes: Maximum size for any file type.
supports_file_upload: Whether the provider supports file upload APIs.
file_upload_threshold_bytes: Size threshold above which to use file upload.
"""
name: str
image: ImageConstraints | None = None
pdf: PDFConstraints | None = None
audio: AudioConstraints | None = None
video: VideoConstraints | None = None
general_max_size_bytes: int | None = None
supports_file_upload: bool = False
file_upload_threshold_bytes: int | None = None
# Anthropic constraints (Claude 3+)
# https://docs.anthropic.com/en/docs/build-with-claude/vision
ANTHROPIC_CONSTRAINTS = ProviderConstraints(
name="anthropic",
image=ImageConstraints(
max_size_bytes=5 * 1024 * 1024, # 5MB
max_width=8000,
max_height=8000,
supported_formats=("image/png", "image/jpeg", "image/gif", "image/webp"),
),
pdf=PDFConstraints(
max_size_bytes=30 * 1024 * 1024, # 30MB
max_pages=100,
),
supports_file_upload=True,
file_upload_threshold_bytes=5 * 1024 * 1024, # Use upload for files > 5MB
)
# OpenAI constraints (GPT-4o, GPT-4 Vision)
# https://platform.openai.com/docs/guides/vision
OPENAI_CONSTRAINTS = ProviderConstraints(
name="openai",
image=ImageConstraints(
max_size_bytes=20 * 1024 * 1024, # 20MB
max_images_per_request=10,
supported_formats=("image/png", "image/jpeg", "image/gif", "image/webp"),
),
# OpenAI does not support PDFs natively
pdf=None,
supports_file_upload=True,
file_upload_threshold_bytes=5 * 1024 * 1024, # Use upload for files > 5MB
)
# Gemini constraints
# https://ai.google.dev/gemini-api/docs/vision
GEMINI_CONSTRAINTS = ProviderConstraints(
name="gemini",
image=ImageConstraints(
max_size_bytes=100 * 1024 * 1024, # 100MB inline
supported_formats=(
"image/png",
"image/jpeg",
"image/gif",
"image/webp",
"image/heic",
"image/heif",
),
),
pdf=PDFConstraints(
max_size_bytes=50 * 1024 * 1024, # 50MB inline
),
audio=AudioConstraints(
max_size_bytes=100 * 1024 * 1024, # 100MB
supported_formats=(
"audio/mp3",
"audio/mpeg",
"audio/wav",
"audio/ogg",
"audio/flac",
"audio/aac",
"audio/m4a",
"audio/opus",
),
),
video=VideoConstraints(
max_size_bytes=2 * 1024 * 1024 * 1024, # 2GB via File API
supported_formats=(
"video/mp4",
"video/mpeg",
"video/webm",
"video/quicktime",
"video/x-msvideo",
"video/x-flv",
),
),
supports_file_upload=True,
file_upload_threshold_bytes=20 * 1024 * 1024, # Use upload for files > 20MB
)
# AWS Bedrock constraints (Claude via Bedrock)
# https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-anthropic-claude-messages.html
BEDROCK_CONSTRAINTS = ProviderConstraints(
name="bedrock",
image=ImageConstraints(
max_size_bytes=4_608_000, # ~4.5MB (encoded size limit)
max_width=8000,
max_height=8000,
supported_formats=("image/png", "image/jpeg", "image/gif", "image/webp"),
),
pdf=PDFConstraints(
max_size_bytes=3_840_000, # ~3.75MB
max_pages=100,
),
)
# Azure OpenAI constraints (same as OpenAI)
AZURE_CONSTRAINTS = ProviderConstraints(
name="azure",
image=ImageConstraints(
max_size_bytes=20 * 1024 * 1024, # 20MB
max_images_per_request=10,
supported_formats=("image/png", "image/jpeg", "image/gif", "image/webp"),
),
pdf=None,
)
# Provider name mapping for convenience
_PROVIDER_CONSTRAINTS_MAP: dict[str, ProviderConstraints] = {
"anthropic": ANTHROPIC_CONSTRAINTS,
"openai": OPENAI_CONSTRAINTS,
"gemini": GEMINI_CONSTRAINTS,
"bedrock": BEDROCK_CONSTRAINTS,
"azure": AZURE_CONSTRAINTS,
# Aliases
"claude": ANTHROPIC_CONSTRAINTS,
"gpt": OPENAI_CONSTRAINTS,
"google": GEMINI_CONSTRAINTS,
"aws": BEDROCK_CONSTRAINTS,
}
def get_constraints_for_provider(
provider: str | ProviderConstraints,
) -> ProviderConstraints | None:
"""Get constraints for a provider by name or return if already ProviderConstraints.
Args:
provider: Provider name string or ProviderConstraints instance.
Returns:
ProviderConstraints for the provider, or None if not found.
"""
if isinstance(provider, ProviderConstraints):
return provider
provider_lower = provider.lower()
# Direct lookup
if provider_lower in _PROVIDER_CONSTRAINTS_MAP:
return _PROVIDER_CONSTRAINTS_MAP[provider_lower]
# Check if provider name contains any known provider
for key, constraints in _PROVIDER_CONSTRAINTS_MAP.items():
if key in provider_lower:
return constraints
return None

View File

@@ -0,0 +1,19 @@
"""Enums for file processing configuration."""
from enum import Enum
class FileHandling(Enum):
"""Defines how files exceeding provider limits should be handled.
Attributes:
STRICT: Fail with an error if file exceeds limits.
AUTO: Automatically resize, compress, or optimize to fit limits.
WARN: Log a warning but attempt to process anyway.
CHUNK: Split large files into smaller pieces.
"""
STRICT = "strict"
AUTO = "auto"
WARN = "warn"
CHUNK = "chunk"

View File

@@ -0,0 +1,83 @@
"""Exceptions for file processing operations."""
class FileProcessingError(Exception):
"""Base exception for file processing errors."""
def __init__(self, message: str, file_name: str | None = None) -> None:
"""Initialize the exception.
Args:
message: Error message describing the issue.
file_name: Optional name of the file that caused the error.
"""
self.file_name = file_name
super().__init__(message)
class FileValidationError(FileProcessingError):
"""Raised when file validation fails."""
class FileTooLargeError(FileValidationError):
"""Raised when a file exceeds the maximum allowed size."""
def __init__(
self,
message: str,
file_name: str | None = None,
actual_size: int | None = None,
max_size: int | None = None,
) -> None:
"""Initialize the exception.
Args:
message: Error message describing the issue.
file_name: Optional name of the file that caused the error.
actual_size: The actual size of the file in bytes.
max_size: The maximum allowed size in bytes.
"""
self.actual_size = actual_size
self.max_size = max_size
super().__init__(message, file_name)
class UnsupportedFileTypeError(FileValidationError):
"""Raised when a file type is not supported by the provider."""
def __init__(
self,
message: str,
file_name: str | None = None,
content_type: str | None = None,
) -> None:
"""Initialize the exception.
Args:
message: Error message describing the issue.
file_name: Optional name of the file that caused the error.
content_type: The content type that is not supported.
"""
self.content_type = content_type
super().__init__(message, file_name)
class ProcessingDependencyError(FileProcessingError):
"""Raised when a required processing dependency is not installed."""
def __init__(
self,
message: str,
dependency: str,
install_command: str | None = None,
) -> None:
"""Initialize the exception.
Args:
message: Error message describing the issue.
dependency: Name of the missing dependency.
install_command: Optional command to install the dependency.
"""
self.dependency = dependency
self.install_command = install_command
super().__init__(message)

View File

@@ -0,0 +1,307 @@
"""FileProcessor for validating and transforming files based on provider constraints."""
from collections.abc import Sequence
import logging
from crewai.utilities.files.content_types import (
AudioFile,
ImageFile,
PDFFile,
TextFile,
VideoFile,
)
from crewai.utilities.files.processing.constraints import (
ProviderConstraints,
get_constraints_for_provider,
)
from crewai.utilities.files.processing.enums import FileHandling
from crewai.utilities.files.processing.exceptions import (
FileProcessingError,
FileTooLargeError,
FileValidationError,
UnsupportedFileTypeError,
)
from crewai.utilities.files.processing.transformers import (
chunk_pdf,
chunk_text,
get_image_dimensions,
get_pdf_page_count,
optimize_image,
resize_image,
)
from crewai.utilities.files.processing.validators import validate_file
logger = logging.getLogger(__name__)
FileInput = AudioFile | ImageFile | PDFFile | TextFile | VideoFile
class FileProcessor:
"""Processes files according to provider constraints and per-file mode mode.
Validates files against provider-specific limits and optionally transforms
them (resize, compress, chunk) to meet those limits. Each file specifies
its own mode mode via `file.mode`.
Attributes:
constraints: Provider constraints for validation.
"""
def __init__(
self,
constraints: ProviderConstraints | str | None = None,
) -> None:
"""Initialize the FileProcessor.
Args:
constraints: Provider constraints or provider name string.
If None, validation is skipped.
"""
if isinstance(constraints, str):
resolved = get_constraints_for_provider(constraints)
if resolved is None:
logger.warning(
f"Unknown provider '{constraints}' - validation disabled"
)
self.constraints = resolved
else:
self.constraints = constraints
def validate(self, file: FileInput) -> Sequence[str]:
"""Validate a file against provider constraints.
Args:
file: The file to validate.
Returns:
List of validation error messages (empty if valid).
Raises:
FileValidationError: If file.mode is STRICT and validation fails.
"""
if self.constraints is None:
return []
mode = self._get_mode(file)
raise_on_error = mode == FileHandling.STRICT
return validate_file(file, self.constraints, raise_on_error=raise_on_error)
def _get_mode(self, file: FileInput) -> FileHandling:
"""Get the mode mode for a file.
Args:
file: The file to get mode for.
Returns:
The file's mode mode, defaulting to AUTO.
"""
mode = getattr(file, "mode", None)
if mode is None:
return FileHandling.AUTO
if isinstance(mode, str):
return FileHandling(mode)
if isinstance(mode, FileHandling):
return mode
return FileHandling.AUTO
def process(self, file: FileInput) -> FileInput | Sequence[FileInput]:
"""Process a single file according to constraints and its mode mode.
Args:
file: The file to process.
Returns:
The processed file (possibly transformed) or a sequence of files
if the file was chunked.
Raises:
FileProcessingError: If file.mode is STRICT and processing fails.
"""
if self.constraints is None:
return file
mode = self._get_mode(file)
try:
# First validate
errors = self.validate(file)
if not errors:
return file
# Handle based on mode
if mode == FileHandling.STRICT:
# Errors should have already raised in validate()
raise FileValidationError("; ".join(errors), file_name=file.filename)
if mode == FileHandling.WARN:
for error in errors:
logger.warning(error)
return file
if mode == FileHandling.AUTO:
return self._auto_process(file)
if mode == FileHandling.CHUNK:
return self._chunk_process(file)
return file
except (FileValidationError, FileTooLargeError, UnsupportedFileTypeError):
raise
except Exception as e:
logger.error(f"Error processing file '{file.filename}': {e}")
if mode == FileHandling.STRICT:
raise FileProcessingError(str(e), file_name=file.filename) from e
return file
def process_files(
self,
files: dict[str, FileInput],
) -> dict[str, FileInput]:
"""Process multiple files according to constraints.
Args:
files: Dictionary mapping names to file inputs.
Returns:
Dictionary mapping names to processed files. If a file is chunked,
multiple entries are created with indexed names.
"""
result: dict[str, FileInput] = {}
for name, file in files.items():
processed = self.process(file)
if isinstance(processed, Sequence) and not isinstance(
processed, (str, bytes)
):
# File was chunked - add each chunk with indexed name
for i, chunk in enumerate(processed):
chunk_name = f"{name}_chunk_{i}"
result[chunk_name] = chunk
else:
result[name] = processed
return result
def _auto_process(self, file: FileInput) -> FileInput:
"""Automatically resize/compress file to meet constraints.
Args:
file: The file to process.
Returns:
The processed file.
"""
if self.constraints is None:
return file
if isinstance(file, ImageFile) and self.constraints.image is not None:
return self._auto_process_image(file)
if isinstance(file, PDFFile) and self.constraints.pdf is not None:
# PDFs can't easily be auto-compressed, log warning
logger.warning(
f"Cannot auto-compress PDF '{file.filename}'. "
"Consider using CHUNK mode for large PDFs."
)
return file
# Audio and video auto-processing would require additional dependencies
# For now, just warn
if isinstance(file, (AudioFile, VideoFile)):
logger.warning(
f"Auto-processing not supported for {type(file).__name__}. "
"File will be used as-is."
)
return file
return file
def _auto_process_image(self, file: ImageFile) -> ImageFile:
"""Auto-process an image file.
Args:
file: The image file to process.
Returns:
The processed image file.
"""
if self.constraints is None or self.constraints.image is None:
return file
image_constraints = self.constraints.image
processed = file
content = file.source.read()
current_size = len(content)
# First, resize if dimensions exceed limits
if image_constraints.max_width or image_constraints.max_height:
dimensions = get_image_dimensions(file)
if dimensions:
width, height = dimensions
max_w = image_constraints.max_width or width
max_h = image_constraints.max_height or height
if width > max_w or height > max_h:
try:
processed = resize_image(file, max_w, max_h)
content = processed.source.read()
current_size = len(content)
except Exception as e:
logger.warning(f"Failed to resize image: {e}")
# Then, optimize if size still exceeds limits
if current_size > image_constraints.max_size_bytes:
try:
processed = optimize_image(processed, image_constraints.max_size_bytes)
except Exception as e:
logger.warning(f"Failed to optimize image: {e}")
return processed
def _chunk_process(self, file: FileInput) -> FileInput | Sequence[FileInput]:
"""Split file into chunks to meet constraints.
Args:
file: The file to chunk.
Returns:
Original file if chunking not needed, or sequence of chunked files.
"""
if self.constraints is None:
return file
if isinstance(file, PDFFile) and self.constraints.pdf is not None:
max_pages = self.constraints.pdf.max_pages
if max_pages is not None:
page_count = get_pdf_page_count(file)
if page_count is not None and page_count > max_pages:
try:
return chunk_pdf(file, max_pages)
except Exception as e:
logger.warning(f"Failed to chunk PDF: {e}")
return file
if isinstance(file, TextFile):
# Use general max size as character limit approximation
max_size = self.constraints.general_max_size_bytes
if max_size is not None:
content = file.source.read()
if len(content) > max_size:
try:
return chunk_text(file, max_size)
except Exception as e:
logger.warning(f"Failed to chunk text file: {e}")
return file
# For other file types, chunking is not supported
if isinstance(file, (ImageFile, AudioFile, VideoFile)):
logger.warning(
f"Chunking not supported for {type(file).__name__}. "
"Consider using AUTO mode for images."
)
return file

View File

@@ -0,0 +1,349 @@
"""File transformation functions for resizing, optimizing, and chunking."""
from collections.abc import Sequence
import io
import logging
from crewai.utilities.files.content_types import ImageFile, PDFFile, TextFile
from crewai.utilities.files.file import FileBytes
from crewai.utilities.files.processing.exceptions import ProcessingDependencyError
logger = logging.getLogger(__name__)
def resize_image(
file: ImageFile,
max_width: int,
max_height: int,
*,
preserve_aspect_ratio: bool = True,
) -> ImageFile:
"""Resize an image to fit within the specified dimensions.
Args:
file: The image file to resize.
max_width: Maximum width in pixels.
max_height: Maximum height in pixels.
preserve_aspect_ratio: If True, maintain aspect ratio while fitting within bounds.
Returns:
A new ImageFile with the resized image data.
Raises:
ProcessingDependencyError: If Pillow is not installed.
"""
try:
from PIL import Image
except ImportError as e:
raise ProcessingDependencyError(
"Pillow is required for image resizing",
dependency="Pillow",
install_command="pip install Pillow",
) from e
content = file.source.read()
with Image.open(io.BytesIO(content)) as img:
original_width, original_height = img.size
# Check if resize is needed
if original_width <= max_width and original_height <= max_height:
return file
if preserve_aspect_ratio:
# Calculate scaling factor to fit within bounds
width_ratio = max_width / original_width
height_ratio = max_height / original_height
scale_factor = min(width_ratio, height_ratio)
new_width = int(original_width * scale_factor)
new_height = int(original_height * scale_factor)
else:
new_width = min(original_width, max_width)
new_height = min(original_height, max_height)
# Resize the image
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Determine output format
output_format = img.format or "PNG"
if output_format.upper() == "JPEG":
# Handle RGBA images for JPEG
if resized_img.mode in ("RGBA", "LA", "P"):
resized_img = resized_img.convert("RGB")
# Save to bytes
output_buffer = io.BytesIO()
resized_img.save(output_buffer, format=output_format)
output_bytes = output_buffer.getvalue()
logger.info(
f"Resized image '{file.filename}' from {original_width}x{original_height} "
f"to {new_width}x{new_height}"
)
return ImageFile(source=FileBytes(data=output_bytes, filename=file.filename))
def optimize_image(
file: ImageFile,
target_size_bytes: int,
*,
min_quality: int = 20,
initial_quality: int = 85,
) -> ImageFile:
"""Optimize an image to fit within a target file size.
Uses iterative quality reduction to achieve target size.
Args:
file: The image file to optimize.
target_size_bytes: Target maximum file size in bytes.
min_quality: Minimum quality to use (prevents excessive degradation).
initial_quality: Starting quality for optimization.
Returns:
A new ImageFile with the optimized image data.
Raises:
ProcessingDependencyError: If Pillow is not installed.
"""
try:
from PIL import Image
except ImportError as e:
raise ProcessingDependencyError(
"Pillow is required for image optimization",
dependency="Pillow",
install_command="pip install Pillow",
) from e
content = file.source.read()
current_size = len(content)
# If already within target, return as-is
if current_size <= target_size_bytes:
return file
with Image.open(io.BytesIO(content)) as img:
# Convert to RGB for JPEG compression if needed
if img.mode in ("RGBA", "LA", "P"):
img = img.convert("RGB")
output_format = "JPEG"
else:
output_format = img.format or "JPEG"
if output_format.upper() not in ("JPEG", "JPG"):
output_format = "JPEG"
quality = initial_quality
output_bytes = content
# Binary search for optimal quality
while len(output_bytes) > target_size_bytes and quality >= min_quality:
output_buffer = io.BytesIO()
img.save(
output_buffer, format=output_format, quality=quality, optimize=True
)
output_bytes = output_buffer.getvalue()
if len(output_bytes) > target_size_bytes:
quality -= 5
logger.info(
f"Optimized image '{file.filename}' from {current_size} bytes to "
f"{len(output_bytes)} bytes (quality={quality})"
)
filename = file.filename
if (
filename
and output_format.upper() == "JPEG"
and not filename.lower().endswith((".jpg", ".jpeg"))
):
filename = filename.rsplit(".", 1)[0] + ".jpg"
return ImageFile(source=FileBytes(data=output_bytes, filename=filename))
def chunk_pdf(
file: PDFFile,
max_pages: int,
*,
overlap_pages: int = 0,
) -> Sequence[PDFFile]:
"""Split a PDF into chunks of maximum page count.
Args:
file: The PDF file to chunk.
max_pages: Maximum pages per chunk.
overlap_pages: Number of overlapping pages between chunks (for context).
Returns:
List of PDFFile objects, one per chunk.
Raises:
ProcessingDependencyError: If pypdf is not installed.
"""
try:
from pypdf import PdfReader, PdfWriter # type: ignore[import-not-found]
except ImportError as e:
raise ProcessingDependencyError(
"pypdf is required for PDF chunking",
dependency="pypdf",
install_command="pip install pypdf",
) from e
content = file.source.read()
reader = PdfReader(io.BytesIO(content))
total_pages = len(reader.pages)
# If within limit, return as-is
if total_pages <= max_pages:
return [file]
chunks: list[PDFFile] = []
filename = file.filename or "document.pdf"
base_filename = filename.rsplit(".", 1)[0]
step = max_pages - overlap_pages
chunk_num = 0
start_page = 0
while start_page < total_pages:
end_page = min(start_page + max_pages, total_pages)
writer = PdfWriter()
for page_num in range(start_page, end_page):
writer.add_page(reader.pages[page_num])
output_buffer = io.BytesIO()
writer.write(output_buffer)
output_bytes = output_buffer.getvalue()
chunk_filename = f"{base_filename}_chunk_{chunk_num}.pdf"
chunks.append(
PDFFile(source=FileBytes(data=output_bytes, filename=chunk_filename))
)
logger.info(
f"Created PDF chunk '{chunk_filename}' with pages {start_page + 1}-{end_page}"
)
start_page += step
chunk_num += 1
return chunks
def chunk_text(
file: TextFile,
max_chars: int,
*,
overlap_chars: int = 200,
split_on_newlines: bool = True,
) -> Sequence[TextFile]:
"""Split a text file into chunks of maximum character count.
Args:
file: The text file to chunk.
max_chars: Maximum characters per chunk.
overlap_chars: Number of overlapping characters between chunks.
split_on_newlines: If True, prefer splitting at newline boundaries.
Returns:
List of TextFile objects, one per chunk.
"""
content = file.source.read()
text = content.decode("utf-8", errors="replace")
total_chars = len(text)
# If within limit, return as-is
if total_chars <= max_chars:
return [file]
chunks: list[TextFile] = []
filename = file.filename or "text.txt"
base_filename = filename.rsplit(".", 1)[0]
extension = filename.rsplit(".", 1)[-1] if "." in filename else "txt"
chunk_num = 0
start_pos = 0
while start_pos < total_chars:
end_pos = min(start_pos + max_chars, total_chars)
# If not at end, try to find a better split point
if end_pos < total_chars and split_on_newlines:
# Look for last newline within the chunk
last_newline = text.rfind("\n", start_pos, end_pos)
if last_newline > start_pos + max_chars // 2: # Don't split too early
end_pos = last_newline + 1
chunk_text = text[start_pos:end_pos]
chunk_bytes = chunk_text.encode("utf-8")
chunk_filename = f"{base_filename}_chunk_{chunk_num}.{extension}"
chunks.append(
TextFile(source=FileBytes(data=chunk_bytes, filename=chunk_filename))
)
logger.info(
f"Created text chunk '{chunk_filename}' with {len(chunk_text)} characters"
)
# Move start position with overlap
start_pos = end_pos - overlap_chars if end_pos < total_chars else total_chars
chunk_num += 1
return chunks
def get_image_dimensions(file: ImageFile) -> tuple[int, int] | None:
"""Get the dimensions of an image file.
Args:
file: The image file to measure.
Returns:
Tuple of (width, height) in pixels, or None if dimensions cannot be determined.
"""
try:
from PIL import Image
except ImportError:
logger.warning("Pillow not installed - cannot get image dimensions")
return None
content = file.source.read()
try:
with Image.open(io.BytesIO(content)) as img:
width, height = img.size
return (width, height)
except Exception as e:
logger.warning(f"Failed to get image dimensions: {e}")
return None
def get_pdf_page_count(file: PDFFile) -> int | None:
"""Get the page count of a PDF file.
Args:
file: The PDF file to measure.
Returns:
Number of pages, or None if page count cannot be determined.
"""
try:
from pypdf import PdfReader
except ImportError:
logger.warning("pypdf not installed - cannot get PDF page count")
return None
content = file.source.read()
try:
reader = PdfReader(io.BytesIO(content))
return len(reader.pages)
except Exception as e:
logger.warning(f"Failed to get PDF page count: {e}")
return None

View File

@@ -0,0 +1,426 @@
"""File validation functions for checking against provider constraints."""
from collections.abc import Sequence
import logging
from crewai.utilities.files.content_types import (
AudioFile,
ImageFile,
PDFFile,
TextFile,
VideoFile,
)
from crewai.utilities.files.processing.constraints import (
AudioConstraints,
ImageConstraints,
PDFConstraints,
ProviderConstraints,
VideoConstraints,
)
from crewai.utilities.files.processing.exceptions import (
FileTooLargeError,
FileValidationError,
UnsupportedFileTypeError,
)
logger = logging.getLogger(__name__)
FileInput = AudioFile | ImageFile | PDFFile | TextFile | VideoFile
def _format_size(size_bytes: int) -> str:
"""Format byte size to human-readable string."""
if size_bytes >= 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024 * 1024):.1f}GB"
if size_bytes >= 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f}MB"
if size_bytes >= 1024:
return f"{size_bytes / 1024:.1f}KB"
return f"{size_bytes}B"
def validate_image(
file: ImageFile,
constraints: ImageConstraints,
*,
raise_on_error: bool = True,
) -> Sequence[str]:
"""Validate an image file against constraints.
Args:
file: The image file to validate.
constraints: Image constraints to validate against.
raise_on_error: If True, raise exceptions on validation failure.
Returns:
List of validation error messages (empty if valid).
Raises:
FileTooLargeError: If the file exceeds size limits.
FileValidationError: If the file exceeds dimension limits.
UnsupportedFileTypeError: If the format is not supported.
"""
errors: list[str] = []
content = file.source.read()
file_size = len(content)
filename = file.filename
# Check file size
if file_size > constraints.max_size_bytes:
msg = (
f"Image '{filename}' size ({_format_size(file_size)}) exceeds "
f"maximum ({_format_size(constraints.max_size_bytes)})"
)
errors.append(msg)
if raise_on_error:
raise FileTooLargeError(
msg,
file_name=filename,
actual_size=file_size,
max_size=constraints.max_size_bytes,
)
# Check format
content_type = file.content_type
if content_type not in constraints.supported_formats:
msg = (
f"Image format '{content_type}' is not supported. "
f"Supported: {', '.join(constraints.supported_formats)}"
)
errors.append(msg)
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=filename, content_type=content_type
)
# Check dimensions if constraints specify them
if constraints.max_width is not None or constraints.max_height is not None:
try:
import io
from PIL import Image
with Image.open(io.BytesIO(content)) as img:
width, height = img.size
if constraints.max_width and width > constraints.max_width:
msg = (
f"Image '{filename}' width ({width}px) exceeds "
f"maximum ({constraints.max_width}px)"
)
errors.append(msg)
if raise_on_error:
raise FileValidationError(msg, file_name=filename)
if constraints.max_height and height > constraints.max_height:
msg = (
f"Image '{filename}' height ({height}px) exceeds "
f"maximum ({constraints.max_height}px)"
)
errors.append(msg)
if raise_on_error:
raise FileValidationError(msg, file_name=filename)
except ImportError:
logger.warning(
"Pillow not installed - cannot validate image dimensions. "
"Install with: pip install Pillow"
)
return errors
def validate_pdf(
file: PDFFile,
constraints: PDFConstraints,
*,
raise_on_error: bool = True,
) -> Sequence[str]:
"""Validate a PDF file against constraints.
Args:
file: The PDF file to validate.
constraints: PDF constraints to validate against.
raise_on_error: If True, raise exceptions on validation failure.
Returns:
List of validation error messages (empty if valid).
Raises:
FileTooLargeError: If the file exceeds size limits.
FileValidationError: If the file exceeds page limits.
"""
errors: list[str] = []
content = file.source.read()
file_size = len(content)
filename = file.filename
# Check file size
if file_size > constraints.max_size_bytes:
msg = (
f"PDF '{filename}' size ({_format_size(file_size)}) exceeds "
f"maximum ({_format_size(constraints.max_size_bytes)})"
)
errors.append(msg)
if raise_on_error:
raise FileTooLargeError(
msg,
file_name=filename,
actual_size=file_size,
max_size=constraints.max_size_bytes,
)
# Check page count if constraint specifies it
if constraints.max_pages is not None:
try:
import io
from pypdf import PdfReader # type: ignore[import-not-found]
reader = PdfReader(io.BytesIO(content))
page_count = len(reader.pages)
if page_count > constraints.max_pages:
msg = (
f"PDF '{filename}' page count ({page_count}) exceeds "
f"maximum ({constraints.max_pages})"
)
errors.append(msg)
if raise_on_error:
raise FileValidationError(msg, file_name=filename)
except ImportError:
logger.warning(
"pypdf not installed - cannot validate PDF page count. "
"Install with: pip install pypdf"
)
return errors
def validate_audio(
file: AudioFile,
constraints: AudioConstraints,
*,
raise_on_error: bool = True,
) -> Sequence[str]:
"""Validate an audio file against constraints.
Args:
file: The audio file to validate.
constraints: Audio constraints to validate against.
raise_on_error: If True, raise exceptions on validation failure.
Returns:
List of validation error messages (empty if valid).
Raises:
FileTooLargeError: If the file exceeds size limits.
UnsupportedFileTypeError: If the format is not supported.
"""
errors: list[str] = []
content = file.source.read()
file_size = len(content)
filename = file.filename
# Check file size
if file_size > constraints.max_size_bytes:
msg = (
f"Audio '{filename}' size ({_format_size(file_size)}) exceeds "
f"maximum ({_format_size(constraints.max_size_bytes)})"
)
errors.append(msg)
if raise_on_error:
raise FileTooLargeError(
msg,
file_name=filename,
actual_size=file_size,
max_size=constraints.max_size_bytes,
)
# Check format
content_type = file.content_type
if content_type not in constraints.supported_formats:
msg = (
f"Audio format '{content_type}' is not supported. "
f"Supported: {', '.join(constraints.supported_formats)}"
)
errors.append(msg)
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=filename, content_type=content_type
)
return errors
def validate_video(
file: VideoFile,
constraints: VideoConstraints,
*,
raise_on_error: bool = True,
) -> Sequence[str]:
"""Validate a video file against constraints.
Args:
file: The video file to validate.
constraints: Video constraints to validate against.
raise_on_error: If True, raise exceptions on validation failure.
Returns:
List of validation error messages (empty if valid).
Raises:
FileTooLargeError: If the file exceeds size limits.
UnsupportedFileTypeError: If the format is not supported.
"""
errors: list[str] = []
content = file.source.read()
file_size = len(content)
filename = file.filename
# Check file size
if file_size > constraints.max_size_bytes:
msg = (
f"Video '{filename}' size ({_format_size(file_size)}) exceeds "
f"maximum ({_format_size(constraints.max_size_bytes)})"
)
errors.append(msg)
if raise_on_error:
raise FileTooLargeError(
msg,
file_name=filename,
actual_size=file_size,
max_size=constraints.max_size_bytes,
)
# Check format
content_type = file.content_type
if content_type not in constraints.supported_formats:
msg = (
f"Video format '{content_type}' is not supported. "
f"Supported: {', '.join(constraints.supported_formats)}"
)
errors.append(msg)
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=filename, content_type=content_type
)
return errors
def validate_text(
file: TextFile,
constraints: ProviderConstraints,
*,
raise_on_error: bool = True,
) -> Sequence[str]:
"""Validate a text file against general constraints.
Args:
file: The text file to validate.
constraints: Provider constraints to validate against.
raise_on_error: If True, raise exceptions on validation failure.
Returns:
List of validation error messages (empty if valid).
Raises:
FileTooLargeError: If the file exceeds size limits.
"""
errors: list[str] = []
if constraints.general_max_size_bytes is None:
return errors
content = file.source.read()
file_size = len(content)
filename = file.filename
if file_size > constraints.general_max_size_bytes:
msg = (
f"Text file '{filename}' size ({_format_size(file_size)}) exceeds "
f"maximum ({_format_size(constraints.general_max_size_bytes)})"
)
errors.append(msg)
if raise_on_error:
raise FileTooLargeError(
msg,
file_name=filename,
actual_size=file_size,
max_size=constraints.general_max_size_bytes,
)
return errors
def validate_file(
file: FileInput,
constraints: ProviderConstraints,
*,
raise_on_error: bool = True,
) -> Sequence[str]:
"""Validate a file against provider constraints.
Dispatches to the appropriate validator based on file type.
Args:
file: The file to validate.
constraints: Provider constraints to validate against.
raise_on_error: If True, raise exceptions on validation failure.
Returns:
List of validation error messages (empty if valid).
Raises:
FileTooLargeError: If the file exceeds size limits.
FileValidationError: If the file fails other validation checks.
UnsupportedFileTypeError: If the file type is not supported.
"""
if isinstance(file, ImageFile):
if constraints.image is None:
msg = f"Provider '{constraints.name}' does not support images"
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=file.filename, content_type=file.content_type
)
return [msg]
return validate_image(file, constraints.image, raise_on_error=raise_on_error)
if isinstance(file, PDFFile):
if constraints.pdf is None:
msg = f"Provider '{constraints.name}' does not support PDFs"
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=file.filename, content_type=file.content_type
)
return [msg]
return validate_pdf(file, constraints.pdf, raise_on_error=raise_on_error)
if isinstance(file, AudioFile):
if constraints.audio is None:
msg = f"Provider '{constraints.name}' does not support audio"
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=file.filename, content_type=file.content_type
)
return [msg]
return validate_audio(file, constraints.audio, raise_on_error=raise_on_error)
if isinstance(file, VideoFile):
if constraints.video is None:
msg = f"Provider '{constraints.name}' does not support video"
if raise_on_error:
raise UnsupportedFileTypeError(
msg, file_name=file.filename, content_type=file.content_type
)
return [msg]
return validate_video(file, constraints.video, raise_on_error=raise_on_error)
if isinstance(file, TextFile):
return validate_text(file, constraints, raise_on_error=raise_on_error)
# Unknown file type - can't validate
return []