Files
crewAI/lib/crewai-files/src/crewai_files/resolution/resolver.py
João Moura bb477f8a91
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
JSON first crews (#6131)
* feat(cli): introduce JSON crew project support and TUI enhancements

- Added support for creating and running JSON-defined crew projects, allowing users to scaffold projects with a new `create_json_crew.py` file.
- Implemented a full-screen Textual TUI for crew execution in `crew_run_tui.py`, enhancing user interaction with a two-column layout.
- Updated `run_crew.py` to prioritize JSON crew projects and added daemon mode for running without TUI.
- Introduced interactive pickers in `tui_picker.py` for improved CLI prompts.
- Enhanced validation for JSON crew files in `validate.py` to ensure proper structure and agent definitions.
- Updated `.gitignore` to exclude demo and crewai directories.

* feat: update LLM model references to gpt-5.4-mini

- Changed default LLM model from gpt-4o-mini to gpt-5.4-mini across various files, including CLI options, JSON crew configurations, and agent definitions.
- Enhanced benchmark and human feedback functionalities to utilize the new model.
- Improved user interface elements in the TUI for better interaction and feedback during execution.
- Added support for new skills directory in JSON crew project creation.

* feat(benchmark): add crew-level benchmarking functionality

- Introduced a new `benchmark` command in the CLI for crew-level benchmarking, allowing users to specify agents, models, and timeout settings.
- Implemented `CrewBenchmarkCase` to handle crew-level benchmark cases with inputs and criteria.
- Enhanced the benchmark runner to support progress tracking and detailed reporting of results for multiple models.
- Added tests for loading crew benchmark cases and validating their structure.
- Updated existing benchmark functions to accommodate the new crew-level execution model.

* feat(cli): enhance JSON crew project functionality and TUI improvements

- Added optional agent-level guardrails and advanced options in JSON crew configurations to improve output validation and flexibility.
- Updated the TUI to better handle plan step statuses, including visual indicators for task completion and failure.
- Introduced methods for parsing and managing step observation events, ensuring accurate updates to task statuses during execution.
- Enhanced validation for JSON crew projects, ensuring proper structure and error handling for agent and task definitions.
- Added comprehensive tests for new features and validation logic, ensuring robustness in JSON crew project handling.

* refactor(cli): streamline JSON crew project handling and improve validation

- Refactored JSON crew project loading and validation logic to enhance clarity and maintainability.
- Introduced utility functions for finding JSON crew files, improving code reuse across modules.
- Removed deprecated benchmark functionality and associated tests to simplify the codebase.
- Updated CLI commands to utilize the new JSON project structure, ensuring compatibility with recent changes.
- Enhanced test coverage for JSON crew project features, ensuring robust validation and error handling.

* feat(cli): enhance activity log navigation and focus management

- Added functionality to focus on the activity log when navigating through log entries.
- Implemented refresh logic for the log panel to ensure updates are displayed correctly during navigation.
- Improved keyboard navigation for log entries, allowing users to expand and scroll through logs seamlessly.
- Added tests to verify the correct behavior of log navigation and focus management in the TUI.

* feat(cli): enhance JSON crew project interaction and input handling

- Introduced a new function to enable prompt line editing for better user experience during input prompts.
- Updated the JSON crew project wizards to show interpolation hints for dynamic values, improving user guidance.
- Enhanced the handling of missing input placeholders by prompting users for required values during crew setup.
- Refactored the crew run logic to ensure proper loading and preparation of JSON-defined crews, including runtime input management.
- Added tests to verify the correct behavior of new input handling features and JSON crew project interactions.

* feat(cli): improve crew project input prompts and event handling

- Enhanced the `_prompt_text` function to allow for configurable spacing before prompts, improving user experience during input collection.
- Updated the wizards for agent and task creation to utilize the new prompt configuration, ensuring a more compact and streamlined interaction.
- Introduced new plan step lifecycle events (`PlanStepStartedEvent`, `PlanStepCompletedEvent`) to better track the execution status of plan steps.
- Refactored the step executor to emit these events during the execution of tasks, improving observability and debugging capabilities.
- Added tests to verify the correct behavior of new prompt handling and event emissions during crew project execution.

* fix: refine json-first crew interactions

* fix: prioritize common json crew tools

* fix: make json crew more tools expandable

* fix: show json crew tools by category

* feat(memory): update default embedder to OpenAI text-embedding-3-large and enhance memory compatibility

- Changed the default embedding model for Memory to OpenAI text-embedding-3-large, which uses 3072-dimensional vectors.
- Added warnings regarding compatibility issues with existing local memory stores created with 1536-dimensional embeddings.
- Updated documentation to reflect the new default embedder and its configuration options.
- Enhanced the CLI and codebase to support the new embedding model across various components, ensuring a seamless transition for users.

* fix: address PR review feedback for JSON-first crews

Review blockers:
- Forward trained_agents_file to JSON crews: crewai run -f now exports
  CREWAI_TRAINED_AGENTS_FILE for the in-process JSON crew path
- Wizard agent picker: Esc/cancel now reprompts instead of silently
  assigning the first agent
- JSON tool resolution hard-fails: unknown tool names, missing custom
  tool files, and invalid custom tool modules raise JSONProjectError
  with actionable messages instead of warn-and-continue
- Embedding dimension mismatch: LanceDB and Qdrant Edge storages raise
  EmbeddingDimensionMismatchError with reset/pin guidance instead of
  silently zero-filling vectors or returning empty search results
- Custom tool code execution documented in loader docstring and the
  scaffolded project README

CI fixes:
- ruff format across lib/
- All 133 PR-introduced mypy errors fixed (llm.py lazy-litellm and
  cli.py lazy command shims now use TYPE_CHECKING imports; textual
  is_mounted misuse fixed; pick_many overloads; misc annotations)

Bot review comments:
- Empty except blocks now have explanatory comments or debug logging
- Removed unused _C_BG/_C_PANEL/_C_BORDER globals and redundant
  import re; tests use a single import style for create_json_crew

Tests: trained-agents propagation, wizard cancel, tool resolution
failures, and dimension mismatch guidance.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix: address second round of PR review comments

Cursor Bugbot:
- Wizard agent slugs: strip to [a-z0-9_] and fall back to agent_<n> so
  symbol-only roles can't produce an empty agents/.jsonc filename
- Wizard task names: dedupe against prior task names and fall back to
  task_<n> for symbol-only descriptions

CodeRabbit:
- Agent.message(): import Task explicitly at runtime instead of relying
  on the namespace injection done by crewai/__init__
- Async executor: move the native-tools-unsupported fallback from
  _ainvoke_loop_react (self-recursion) to _ainvoke_loop_native_tools,
  mirroring the sync implementation
- StepExecutor downgrade: keep the in-step conversation and append the
  text-tooling instructions instead of rebuilding messages, so completed
  native tool calls are not re-executed
- crewai-files: extension-based MIME lookup now runs before byte
  sniffing so csv/xml types are not degraded to text/plain
- Memory storages: validate every record in a save() batch against a
  consistent embedding dimension (LanceDB previously checked only the
  first record); added mixed-batch tests
- _print_post_tui_summary now typed against CrewRunApp
- Docs: Azure OpenAI default embedder change called out in the memory
  migration warning and provider table

Code quality bots:
- Removed unused _C_YELLOW/_C_CYAN (crew_run_tui) and _GREEN (tui_picker)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* feat(cli): accordion tool picker in JSON crew wizard

The flat tool list had grown to ~90 rows. The picker now shows:
- Common tools always visible at the top
- Every other category as a single expandable row with tool and
  selection counts (e.g. "Search & Research  (27 tools, 2 selected)")
- Expanding a category collapses the previously expanded one
- Selections persist across expand/collapse via new preselected
  support in pick_many; cursor follows the toggled category row

tui_picker gains preselected + initial_cursor options on pick_many,
and Esc in multi-select now confirms the current selection instead of
discarding it (required so collapsing can't silently drop choices).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* refactor(cli): remove --daemon flag from crewai run

The flag only affected JSON crew projects — classic and flow projects
ignored it entirely, which made the behavior inconsistent. Removed the
option, the daemon code path (_run_json_crew_daemon), and its helper
(_load_json_crew_with_inputs).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* test: update run command tests after --daemon removal

lib/crewai/tests/cli/test_run_crew.py still asserted the old
run_crew(trained_agents_file=..., daemon=False) call signature.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix(cli): exit codes, mid-run quit, async statuses, hyphen placeholders

Addresses the latest Bugbot review round:

- Failed JSON crew runs now exit non-zero (SystemExit(1)) so scripts
  and CI don't treat failures as success, mirroring the classic path
- Quitting the TUI mid-run now ends the process (os._exit(130));
  kickoff runs in a thread worker that cannot be force-cancelled, so
  letting the CLI return would leave LLM/tool work burning tokens in
  the background
- Sidebar task statuses are now async-safe: completion/failure events
  resolve the task's own row via identity instead of assuming the most
  recently started task, and starting a task no longer blanket-marks
  earlier active rows as done
- The runtime-input prompt regex now accepts hyphenated placeholder
  names ({my-topic}), matching kickoff's interpolation pattern

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix: validation safety, custom tool sandboxing, TUI log integrity, memory error surfacing

- Deploy validation no longer executes project code: validation mode
  checks tool declarations structurally (well-formed entries, custom
  tool file exists) without importing or instantiating anything.
  custom:<name> resolution only happens on the actual run path.
- custom:<name> is constrained to [A-Za-z_][A-Za-z0-9_]* and the
  resolved path must stay inside the project's tools/ directory, so
  custom:../foo or absolute-path names cannot execute code outside it.
  Tool paths resolve relative to the crew project root, not cwd.
- TUI task logs are built from per-task state captured at task start
  (idx, description, agent, start time); an out-of-order completion
  takes its output from the event and no longer steals or resets the
  current task's streamed steps/output.
- EmbeddingDimensionMismatchError now inherits ValueError instead of
  RuntimeError so background saves surface it through
  MemorySaveFailedEvent instead of silently dropping the save; the
  shutdown catch in _background_encode_batch is narrowed to the
  "cannot schedule new futures" case.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix(cli): declared project type wins over crew.json presence

A flow project that also contains a crew.json(c) file now runs and
validates as the flow it declares in pyproject.toml instead of being
hijacked by the JSON crew path. Both crewai run (_has_json_crew) and
deploy validation (_is_json_crew) check tool.crewai.type; a missing or
unreadable pyproject still means a bare JSON crew project.

Also documents why StepObservationFailedEvent intentionally marks the
plan step "done": the event signals an observer failure, not a step
failure, and the executor continues past it.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix(cli): type the declared_type locals so mypy stays clean

Comparing an Any-typed .get() chain returns Any, which tripped
no-any-return on the previous commit.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
2026-06-14 04:19:48 -03:00

684 lines
22 KiB
Python

"""FileResolver for deciding file delivery method and managing uploads."""
import asyncio
import base64
from dataclasses import dataclass, field
import hashlib
import logging
from crewai_files.cache.metrics import measure_operation
from crewai_files.cache.upload_cache import CachedUpload, UploadCache
from crewai_files.core.constants import UPLOAD_MAX_RETRIES, UPLOAD_RETRY_DELAY_BASE
from crewai_files.core.resolved import (
FileReference,
InlineBase64,
InlineBytes,
ResolvedFile,
UrlReference,
)
from crewai_files.core.sources import FileUrl
from crewai_files.core.types import FileInput
from crewai_files.processing.constraints import (
AudioConstraints,
ImageConstraints,
PDFConstraints,
ProviderConstraints,
VideoConstraints,
get_constraints_for_provider,
)
from crewai_files.uploaders import UploadResult, get_uploader
from crewai_files.uploaders.base import FileUploader
from crewai_files.uploaders.factory import ProviderType
logger = logging.getLogger(__name__)
@dataclass
class FileContext:
"""Cached file metadata to avoid redundant reads.
Attributes:
content: Raw file bytes.
size: Size of the file in bytes.
content_hash: SHA-256 hash of the file content.
content_type: MIME type of the file.
"""
content: bytes
size: int
content_hash: str
content_type: str
@dataclass
class FileResolverConfig:
"""Configuration for FileResolver.
Attributes:
prefer_upload: If True, prefer uploading over inline for supported providers.
upload_threshold_bytes: Size threshold above which to use upload.
If None, uses provider-specific threshold.
use_bytes_for_bedrock: If True, use raw bytes instead of base64 for Bedrock.
"""
prefer_upload: bool = False
upload_threshold_bytes: int | None = None
use_bytes_for_bedrock: bool = True
@dataclass
class FileResolver:
"""Resolves files to their delivery format based on provider capabilities.
Decides whether to use inline base64, raw bytes, or file upload based on:
- Provider constraints and capabilities
- File size
- Configuration preferences
Caches uploaded files to avoid redundant uploads.
Attributes:
config: Resolver configuration.
upload_cache: Cache for tracking uploaded files.
"""
config: FileResolverConfig = field(default_factory=FileResolverConfig)
upload_cache: UploadCache | None = None
_uploaders: dict[str, FileUploader] = field(default_factory=dict)
@staticmethod
def _build_file_context(file: FileInput) -> FileContext:
"""Build context by reading file once.
Args:
file: The file to build context for.
Returns:
FileContext with cached metadata.
"""
content = file.read()
return FileContext(
content=content,
size=len(content),
content_hash=hashlib.sha256(content).hexdigest(),
content_type=file.content_type,
)
@staticmethod
def _is_url_source(file: FileInput) -> bool:
"""Check if file source is a URL.
Args:
file: The file to check.
Returns:
True if the file source is a FileUrl, False otherwise.
"""
return isinstance(file._file_source, FileUrl)
@staticmethod
def _supports_url(constraints: ProviderConstraints | None) -> bool:
"""Check if provider supports URL references.
Args:
constraints: Provider constraints.
Returns:
True if the provider supports URL references, False otherwise.
"""
return constraints is not None and constraints.supports_url_references
@classmethod
def _should_resolve_as_url_reference(
cls,
file: FileInput,
provider: ProviderType,
constraints: ProviderConstraints | None,
) -> bool:
"""Check if the provider can accept the current URL source directly."""
if not cls._is_url_source(file) or not cls._supports_url(constraints):
return False
provider_lower = provider.lower()
return "bedrock" not in provider_lower and "aws" not in provider_lower
@staticmethod
def _resolve_as_url(file: FileInput) -> UrlReference:
"""Resolve a URL source as UrlReference.
Args:
file: The file with URL source.
Returns:
UrlReference with the URL and content type.
"""
source = file._file_source
if not isinstance(source, FileUrl):
raise TypeError(f"Expected FileUrl source, got {type(source).__name__}")
return UrlReference(
content_type=file.content_type,
url=source.url,
)
def resolve(self, file: FileInput, provider: ProviderType) -> ResolvedFile:
"""Resolve a file to its delivery format for a provider.
Args:
file: The file to resolve.
provider: Provider name (e.g., "gemini", "anthropic", "openai").
Returns:
ResolvedFile representing the appropriate delivery format.
"""
constraints = get_constraints_for_provider(provider)
if self._should_resolve_as_url_reference(file, provider, constraints):
return self._resolve_as_url(file)
context = self._build_file_context(file)
should_upload = self._should_upload(file, provider, constraints, context.size)
if should_upload:
resolved = self._resolve_via_upload(file, provider, context)
if resolved is not None:
return resolved
return self._resolve_inline(file, provider, context)
def resolve_files(
self,
files: dict[str, FileInput],
provider: ProviderType,
) -> dict[str, ResolvedFile]:
"""Resolve multiple files for a provider.
Args:
files: Dictionary mapping names to file inputs.
provider: Provider name.
Returns:
Dictionary mapping names to resolved files.
"""
return {name: self.resolve(file, provider) for name, file in files.items()}
@staticmethod
def _get_type_constraint(
content_type: str,
constraints: ProviderConstraints,
) -> ImageConstraints | PDFConstraints | AudioConstraints | VideoConstraints | None:
"""Get type-specific constraint based on content type.
Args:
content_type: MIME type of the file.
constraints: Provider constraints.
Returns:
Type-specific constraint or None if not found.
"""
if content_type.startswith("image/"):
return constraints.image
if content_type == "application/pdf":
return constraints.pdf
if content_type.startswith("audio/"):
return constraints.audio
if content_type.startswith("video/"):
return constraints.video
return None
def _should_upload(
self,
file: FileInput,
provider: str,
constraints: ProviderConstraints | None,
file_size: int,
) -> bool:
"""Determine if a file should be uploaded rather than inlined.
Uses type-specific constraints to make smarter decisions:
- Checks if file exceeds type-specific inline size limits
- Falls back to general threshold if no type-specific constraint
Args:
file: The file to check.
provider: Provider name.
constraints: Provider constraints.
file_size: Size of the file in bytes.
Returns:
True if the file should be uploaded, False otherwise.
"""
if constraints is None or not constraints.supports_file_upload:
return False
if self.config.prefer_upload:
return True
content_type = file.content_type
type_constraint = self._get_type_constraint(content_type, constraints)
if type_constraint is not None:
if file_size > type_constraint.max_size_bytes:
logger.debug(
f"File {file.filename} ({file_size}B) exceeds {content_type} "
f"inline limit ({type_constraint.max_size_bytes}B) for {provider}"
)
return True
# Fall back to general threshold
threshold = self.config.upload_threshold_bytes
if threshold is None:
threshold = constraints.file_upload_threshold_bytes
if threshold is not None and file_size > threshold:
return True
return False
def _resolve_via_upload(
self,
file: FileInput,
provider: ProviderType,
context: FileContext,
) -> ResolvedFile | None:
"""Resolve a file by uploading it.
Args:
file: The file to upload.
provider: Provider name.
context: Pre-computed file context.
Returns:
FileReference if upload succeeds, None otherwise.
"""
if self.upload_cache is not None:
cached = self.upload_cache.get_by_hash(context.content_hash, provider)
if cached is not None:
logger.debug(
f"Using cached upload for {file.filename}: {cached.file_id}"
)
return FileReference(
content_type=cached.content_type,
file_id=cached.file_id,
provider=cached.provider,
expires_at=cached.expires_at,
file_uri=cached.file_uri,
)
uploader = self._get_uploader(provider)
if uploader is None:
logger.debug(f"No uploader available for {provider}")
return None
result = self._upload_with_retry(uploader, file, provider, context.size)
if result is None:
return None
if self.upload_cache is not None:
self.upload_cache.set_by_hash(
file_hash=context.content_hash,
content_type=context.content_type,
provider=provider,
file_id=result.file_id,
file_uri=result.file_uri,
expires_at=result.expires_at,
)
return FileReference(
content_type=result.content_type,
file_id=result.file_id,
provider=result.provider,
expires_at=result.expires_at,
file_uri=result.file_uri,
)
@staticmethod
def _upload_with_retry(
uploader: FileUploader,
file: FileInput,
provider: str,
file_size: int,
) -> UploadResult | None:
"""Upload with exponential backoff retry.
Args:
uploader: The uploader to use.
file: The file to upload.
provider: Provider name for logging.
file_size: Size of the file in bytes.
Returns:
UploadResult if successful, None otherwise.
"""
import time
from crewai_files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
last_error: Exception | None = None
for attempt in range(UPLOAD_MAX_RETRIES):
with measure_operation(
"upload",
filename=file.filename,
provider=provider,
size_bytes=file_size,
attempt=attempt + 1,
) as metrics:
try:
result = uploader.upload(file)
metrics.metadata["file_id"] = result.file_id
return result
except PermanentUploadError as e:
metrics.metadata["error_type"] = "permanent"
logger.warning(
f"Non-retryable upload error for {file.filename}: {e}"
)
return None
except TransientUploadError as e:
metrics.metadata["error_type"] = "transient"
last_error = e
except Exception as e:
metrics.metadata["error_type"] = "unknown"
last_error = e
if attempt < UPLOAD_MAX_RETRIES - 1:
delay = UPLOAD_RETRY_DELAY_BASE**attempt
logger.debug(
f"Retrying upload for {file.filename} in {delay}s (attempt {attempt + 1})"
)
time.sleep(delay)
logger.warning(
f"Upload failed for {file.filename} to {provider} after {UPLOAD_MAX_RETRIES} attempts: {last_error}"
)
return None
def _resolve_inline(
self,
file: FileInput,
provider: str,
context: FileContext,
) -> ResolvedFile:
"""Resolve a file as inline content.
Args:
file: The file to resolve (used for logging).
provider: Provider name.
context: Pre-computed file context.
Returns:
InlineBase64 or InlineBytes depending on provider.
"""
logger.debug(f"Resolving {file.filename} as inline for {provider}")
if self.config.use_bytes_for_bedrock and "bedrock" in provider:
return InlineBytes(
content_type=context.content_type,
data=context.content,
)
encoded = base64.b64encode(context.content).decode("ascii")
return InlineBase64(
content_type=context.content_type,
data=encoded,
)
async def aresolve(self, file: FileInput, provider: ProviderType) -> ResolvedFile:
"""Async resolve a file to its delivery format for a provider.
Args:
file: The file to resolve.
provider: Provider name (e.g., "gemini", "anthropic", "openai").
Returns:
ResolvedFile representing the appropriate delivery format.
"""
constraints = get_constraints_for_provider(provider)
if self._should_resolve_as_url_reference(file, provider, constraints):
return self._resolve_as_url(file)
context = self._build_file_context(file)
should_upload = self._should_upload(file, provider, constraints, context.size)
if should_upload:
resolved = await self._aresolve_via_upload(file, provider, context)
if resolved is not None:
return resolved
return self._resolve_inline(file, provider, context)
async def aresolve_files(
self,
files: dict[str, FileInput],
provider: ProviderType,
max_concurrency: int = 10,
) -> dict[str, ResolvedFile]:
"""Async resolve multiple files in parallel.
Args:
files: Dictionary mapping names to file inputs.
provider: Provider name.
max_concurrency: Maximum number of concurrent resolutions.
Returns:
Dictionary mapping names to resolved files.
"""
semaphore = asyncio.Semaphore(max_concurrency)
async def resolve_single(
entry_key: str, input_file: FileInput
) -> tuple[str, ResolvedFile]:
"""Resolve a single file with semaphore limiting."""
async with semaphore:
entry_resolved = await self.aresolve(input_file, provider)
return entry_key, entry_resolved
tasks = [resolve_single(n, f) for n, f in files.items()]
gather_results = await asyncio.gather(*tasks, return_exceptions=True)
output: dict[str, ResolvedFile] = {}
for item in gather_results:
if isinstance(item, BaseException):
logger.error(f"Resolution failed: {item}")
continue
key, resolved = item
output[key] = resolved
return output
async def _aresolve_via_upload(
self,
file: FileInput,
provider: ProviderType,
context: FileContext,
) -> ResolvedFile | None:
"""Async resolve a file by uploading it.
Args:
file: The file to upload.
provider: Provider name.
context: Pre-computed file context.
Returns:
FileReference if upload succeeds, None otherwise.
"""
if self.upload_cache is not None:
cached = await self.upload_cache.aget_by_hash(
context.content_hash, provider
)
if cached is not None:
logger.debug(
f"Using cached upload for {file.filename}: {cached.file_id}"
)
return FileReference(
content_type=cached.content_type,
file_id=cached.file_id,
provider=cached.provider,
expires_at=cached.expires_at,
file_uri=cached.file_uri,
)
uploader = self._get_uploader(provider)
if uploader is None:
logger.debug(f"No uploader available for {provider}")
return None
result = await self._aupload_with_retry(uploader, file, provider, context.size)
if result is None:
return None
if self.upload_cache is not None:
await self.upload_cache.aset_by_hash(
file_hash=context.content_hash,
content_type=context.content_type,
provider=provider,
file_id=result.file_id,
file_uri=result.file_uri,
expires_at=result.expires_at,
)
return FileReference(
content_type=result.content_type,
file_id=result.file_id,
provider=result.provider,
expires_at=result.expires_at,
file_uri=result.file_uri,
)
@staticmethod
async def _aupload_with_retry(
uploader: FileUploader,
file: FileInput,
provider: str,
file_size: int,
) -> UploadResult | None:
"""Async upload with exponential backoff retry.
Args:
uploader: The uploader to use.
file: The file to upload.
provider: Provider name for logging.
file_size: Size of the file in bytes.
Returns:
UploadResult if successful, None otherwise.
"""
from crewai_files.processing.exceptions import (
PermanentUploadError,
TransientUploadError,
)
last_error: Exception | None = None
for attempt in range(UPLOAD_MAX_RETRIES):
with measure_operation(
"upload",
filename=file.filename,
provider=provider,
size_bytes=file_size,
attempt=attempt + 1,
) as metrics:
try:
result = await uploader.aupload(file)
metrics.metadata["file_id"] = result.file_id
return result
except PermanentUploadError as e:
metrics.metadata["error_type"] = "permanent"
logger.warning(
f"Non-retryable upload error for {file.filename}: {e}"
)
return None
except TransientUploadError as e:
metrics.metadata["error_type"] = "transient"
last_error = e
except Exception as e:
metrics.metadata["error_type"] = "unknown"
last_error = e
if attempt < UPLOAD_MAX_RETRIES - 1:
delay = UPLOAD_RETRY_DELAY_BASE**attempt
logger.debug(
f"Retrying upload for {file.filename} in {delay}s (attempt {attempt + 1})"
)
await asyncio.sleep(delay)
logger.warning(
f"Upload failed for {file.filename} to {provider} after {UPLOAD_MAX_RETRIES} attempts: {last_error}"
)
return None
def _get_uploader(self, provider: ProviderType) -> FileUploader | None:
"""Get or create an uploader for a provider.
Args:
provider: Provider name.
Returns:
FileUploader instance or None if not available.
"""
if provider not in self._uploaders:
uploader = get_uploader(provider)
if uploader is not None:
self._uploaders[provider] = uploader
else:
return None
return self._uploaders.get(provider)
def get_cached_uploads(self, provider: ProviderType) -> list[CachedUpload]:
"""Get all cached uploads for a provider.
Args:
provider: Provider name.
Returns:
List of cached uploads.
"""
if self.upload_cache is None:
return []
return self.upload_cache.get_all_for_provider(provider)
def clear_cache(self) -> None:
"""Clear the upload cache."""
if self.upload_cache is not None:
self.upload_cache.clear()
def create_resolver(
provider: str | None = None,
prefer_upload: bool = False,
upload_threshold_bytes: int | None = None,
enable_cache: bool = True,
) -> FileResolver:
"""Create a configured FileResolver.
Args:
provider: Optional provider name to load default threshold from constraints.
prefer_upload: Whether to prefer upload over inline.
upload_threshold_bytes: Size threshold for using upload. If None and
provider is specified, uses provider's default threshold.
enable_cache: Whether to enable upload caching.
Returns:
Configured FileResolver instance.
"""
threshold = upload_threshold_bytes
if threshold is None and provider is not None:
constraints = get_constraints_for_provider(provider)
if constraints is not None:
threshold = constraints.file_upload_threshold_bytes
config = FileResolverConfig(
prefer_upload=prefer_upload,
upload_threshold_bytes=threshold,
)
cache = UploadCache() if enable_cache else None
return FileResolver(config=config, upload_cache=cache)