feat: add input_files support to Task and Crew

- Add input_files parameter to Task for file attachments
- Add file_handling mode to Crew for processing behavior
- Integrate file injection in CrewAgentExecutor
- Update prepare_kickoff to handle KickoffInputs type
This commit is contained in:
Greyson LaLonde
2026-01-21 20:11:05 -05:00
parent 771eccfcdf
commit 4ed5e4ca0e
4 changed files with 249 additions and 17 deletions

View File

@@ -43,6 +43,8 @@ from crewai.utilities.agent_utils import (
process_llm_response,
)
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.file_store import get_all_files
from crewai.utilities.files import FileProcessor
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.printer import Printer
from crewai.utilities.tool_utils import (
@@ -188,6 +190,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
self.messages.append(format_message_for_llm(user_prompt))
self._inject_multimodal_files()
self._show_start_logs()
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))
@@ -212,6 +216,48 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._create_external_memory(formatted_answer)
return {"output": formatted_answer.output}
def _inject_multimodal_files(self) -> None:
"""Inject files as multimodal content into messages.
For crews with input files and LLMs that support multimodal,
processes files according to provider constraints and file handling mode,
then delegates to the LLM's format_multimodal_content method to
generate provider-specific content blocks.
"""
if not self.crew or not self.task:
return
if not self.llm.supports_multimodal():
return
files = get_all_files(self.crew.id, self.task.id)
if not files:
return
provider = getattr(self.llm, "provider", None) or getattr(self.llm, "model", "")
processor = FileProcessor(constraints=provider)
files = processor.process_files(files)
from crewai.utilities.files import get_upload_cache
upload_cache = get_upload_cache()
content_blocks = self.llm.format_multimodal_content(
files, upload_cache=upload_cache
)
if not content_blocks:
return
for i in range(len(self.messages) - 1, -1, -1):
msg = self.messages[i]
if msg.get("role") == "user":
existing_content = msg.get("content", "")
if isinstance(existing_content, str):
msg["content"] = [
self.llm.format_text_content(existing_content),
*content_blocks,
]
break
def _invoke_loop(self) -> AgentFinish:
"""Execute agent loop until completion.
@@ -355,6 +401,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
self.messages.append(format_message_for_llm(user_prompt))
self._inject_multimodal_files()
self._show_start_logs()
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))

View File

@@ -80,6 +80,7 @@ from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.agent_tools.read_file_tool import ReadFileTool
from crewai.tools.base_tool import BaseTool
from crewai.types.streaming import CrewStreamingOutput
from crewai.types.usage_metrics import UsageMetrics
@@ -88,6 +89,7 @@ from crewai.utilities.crew.models import CrewContext
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.file_handler import FileHandler
from crewai.utilities.file_store import clear_files, get_all_files
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -106,6 +108,7 @@ from crewai.utilities.streaming import (
)
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.utilities.types import KickoffInputs
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
@@ -675,7 +678,7 @@ class Crew(FlowTrackable, BaseModel):
def kickoff(
self,
inputs: dict[str, Any] | None = None,
inputs: KickoffInputs | dict[str, Any] | None = None,
) -> CrewOutput | CrewStreamingOutput:
if self.stream:
enable_agent_streaming(self.agents)
@@ -732,6 +735,7 @@ class Crew(FlowTrackable, BaseModel):
)
raise
finally:
clear_files(self.id)
detach(token)
def kickoff_for_each(
@@ -762,7 +766,7 @@ class Crew(FlowTrackable, BaseModel):
return results
async def kickoff_async(
self, inputs: dict[str, Any] | None = None
self, inputs: KickoffInputs | dict[str, Any] | None = None
) -> CrewOutput | CrewStreamingOutput:
"""Asynchronous kickoff method to start the crew execution.
@@ -817,7 +821,7 @@ class Crew(FlowTrackable, BaseModel):
return await run_for_each_async(self, inputs, kickoff_fn)
async def akickoff(
self, inputs: dict[str, Any] | None = None
self, inputs: KickoffInputs | dict[str, Any] | None = None
) -> CrewOutput | CrewStreamingOutput:
"""Native async kickoff method using async task execution throughout.
@@ -880,6 +884,7 @@ class Crew(FlowTrackable, BaseModel):
)
raise
finally:
clear_files(self.id)
detach(token)
async def akickoff_for_each(
@@ -1215,7 +1220,8 @@ class Crew(FlowTrackable, BaseModel):
and hasattr(agent, "multimodal")
and getattr(agent, "multimodal", False)
):
tools = self._add_multimodal_tools(agent, tools)
if not (agent.llm and agent.llm.supports_multimodal()):
tools = self._add_multimodal_tools(agent, tools)
if agent and (hasattr(agent, "apps") and getattr(agent, "apps", None)):
tools = self._add_platform_tools(task, tools)
@@ -1223,7 +1229,24 @@ class Crew(FlowTrackable, BaseModel):
if agent and (hasattr(agent, "mcps") and getattr(agent, "mcps", None)):
tools = self._add_mcp_tools(task, tools)
# Return a list[BaseTool] compatible with Task.execute_sync and execute_async
files = get_all_files(self.id, task.id)
if files:
supported_types: list[str] = []
if agent and agent.llm and agent.llm.supports_multimodal():
supported_types = agent.llm.supported_multimodal_content_types()
def is_auto_injected(content_type: str) -> bool:
return any(content_type.startswith(t) for t in supported_types)
# Only add read_file tool if there are files that need it
files_needing_tool = {
name: f
for name, f in files.items()
if not is_auto_injected(f.content_type)
}
if files_needing_tool:
tools = self._add_file_tools(tools, files_needing_tool)
return tools
def _get_agent_to_use(self, task: Task) -> BaseAgent | None:
@@ -1303,6 +1326,22 @@ class Crew(FlowTrackable, BaseModel):
return self._merge_tools(tools, cast(list[BaseTool], code_tools))
return tools
def _add_file_tools(
self, tools: list[BaseTool], files: dict[str, Any]
) -> list[BaseTool]:
"""Add file reading tool when input files are available.
Args:
tools: Current list of tools.
files: Dictionary of input files.
Returns:
Updated list with file tool added.
"""
read_file_tool = ReadFileTool()
read_file_tool.set_files(files)
return self._merge_tools(tools, [read_file_tool])
def _add_delegation_tools(
self, task: Task, tools: list[BaseTool]
) -> list[BaseTool]:

View File

@@ -10,11 +10,20 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.crews.crew_output import CrewOutput
from crewai.rag.embeddings.types import EmbedderConfig
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.utilities.file_store import store_files
from crewai.utilities.files import (
AudioFile,
ImageFile,
PDFFile,
TextFile,
VideoFile,
)
from crewai.utilities.streaming import (
StreamingState,
TaskInfo,
create_streaming_state,
)
from crewai.utilities.types import KickoffInputs
if TYPE_CHECKING:
@@ -176,7 +185,36 @@ def check_conditional_skip(
return None
def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any] | None:
def _extract_files_from_inputs(inputs: dict[str, Any]) -> dict[str, Any]:
"""Extract file objects from inputs dict.
Scans inputs for FileInput objects (ImageFile, TextFile, etc.) and
extracts them into a separate dict.
Args:
inputs: The inputs dictionary to scan.
Returns:
Dictionary of extracted file objects.
"""
file_types = (AudioFile, ImageFile, PDFFile, TextFile, VideoFile)
files: dict[str, Any] = {}
keys_to_remove: list[str] = []
for key, value in inputs.items():
if isinstance(value, file_types):
files[key] = value
keys_to_remove.append(key)
for key in keys_to_remove:
del inputs[key]
return files
def prepare_kickoff(
crew: Crew, inputs: KickoffInputs | dict[str, Any] | None
) -> dict[str, Any] | None:
"""Prepare crew for kickoff execution.
Handles before callbacks, event emission, task handler reset, input
@@ -192,14 +230,17 @@ def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any]
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.crew_events import CrewKickoffStartedEvent
# Normalize inputs to dict[str, Any] for internal processing
normalized: dict[str, Any] | None = dict(inputs) if inputs is not None else None
for before_callback in crew.before_kickoff_callbacks:
if inputs is None:
inputs = {}
inputs = before_callback(inputs)
if normalized is None:
normalized = {}
normalized = before_callback(normalized)
future = crewai_event_bus.emit(
crew,
CrewKickoffStartedEvent(crew_name=crew.name, inputs=inputs),
CrewKickoffStartedEvent(crew_name=crew.name, inputs=normalized),
)
if future is not None:
try:
@@ -210,9 +251,20 @@ def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any]
crew._task_output_handler.reset()
crew._logging_color = "bold_purple"
if inputs is not None:
crew._inputs = inputs
crew._interpolate_inputs(inputs)
if normalized is not None:
# Extract files from dedicated "files" key
files = normalized.pop("files", None) or {}
# Extract file objects unpacked directly into inputs
unpacked_files = _extract_files_from_inputs(normalized)
# Merge files (unpacked files take precedence over explicit files dict)
all_files = {**files, **unpacked_files}
if all_files:
store_files(crew.id, all_files)
crew._inputs = normalized
crew._interpolate_inputs(normalized)
crew._set_tasks_callbacks()
crew._set_allow_crewai_trigger_context_for_first_task()
@@ -227,7 +279,7 @@ def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any]
if crew.planning:
crew._handle_crew_planning()
return inputs
return normalized
class StreamingContext:

View File

@@ -44,6 +44,17 @@ from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.file_store import (
clear_task_files,
get_all_files,
store_task_files,
)
from crewai.utilities.files import (
FileInput,
FilePath,
FileSourceInput,
normalize_input_files,
)
from crewai.utilities.guardrail import (
process_guardrail,
)
@@ -142,6 +153,10 @@ class Task(BaseModel):
default_factory=list,
description="Tools the agent is limited to use for this task.",
)
input_files: list[FileSourceInput | FileInput] = Field(
default_factory=list,
description="List of input files for this task. Accepts paths, bytes, or File objects.",
)
security_config: SecurityConfig = Field(
default_factory=SecurityConfig,
description="Security configuration for the task.",
@@ -357,6 +372,21 @@ class Task(BaseModel):
"may_not_set_field", "This field is not to be set by the user.", {}
)
@field_validator("input_files", mode="before")
@classmethod
def _normalize_input_files(cls, v: list[Any]) -> list[Any]:
"""Convert string paths to FilePath objects."""
if not v:
return v
result = []
for item in v:
if isinstance(item, str):
result.append(FilePath(path=Path(item)))
else:
result.append(item)
return result
@field_validator("output_file")
@classmethod
def output_file_validation(cls, value: str | None) -> str | None:
@@ -495,10 +525,10 @@ class Task(BaseModel):
) -> None:
"""Execute the task asynchronously with context handling."""
try:
result = self._execute_core(agent, context, tools)
future.set_result(result)
result = self._execute_core(agent, context, tools)
future.set_result(result)
except Exception as e:
future.set_exception(e)
future.set_exception(e)
async def aexecute_sync(
self,
@@ -516,6 +546,7 @@ class Task(BaseModel):
tools: list[Any] | None,
) -> TaskOutput:
"""Run the core execution logic of the task asynchronously."""
self._store_input_files()
try:
agent = agent or self.agent
self.agent = agent
@@ -600,6 +631,8 @@ class Task(BaseModel):
self.end_time = datetime.datetime.now()
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self)) # type: ignore[no-untyped-call]
raise e # Re-raise the exception after emitting the event
finally:
clear_task_files(self.id)
def _execute_core(
self,
@@ -608,6 +641,7 @@ class Task(BaseModel):
tools: list[Any] | None,
) -> TaskOutput:
"""Run the core execution logic of the task."""
self._store_input_files()
try:
agent = agent or self.agent
self.agent = agent
@@ -693,6 +727,8 @@ class Task(BaseModel):
self.end_time = datetime.datetime.now()
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self)) # type: ignore[no-untyped-call]
raise e # Re-raise the exception after emitting the event
finally:
clear_task_files(self.id)
def prompt(self) -> str:
"""Generates the task prompt with optional markdown formatting.
@@ -715,6 +751,51 @@ class Task(BaseModel):
if trigger_payload is not None:
description += f"\n\nTrigger Payload: {trigger_payload}"
if self.agent and self.agent.crew:
files = get_all_files(self.agent.crew.id, self.id)
if files:
supported_types: list[str] = []
if self.agent.llm and self.agent.llm.supports_multimodal():
supported_types = (
self.agent.llm.supported_multimodal_content_types()
)
def is_auto_injected(content_type: str) -> bool:
return any(content_type.startswith(t) for t in supported_types)
auto_injected_files = {
name: f_input
for name, f_input in files.items()
if is_auto_injected(f_input.content_type)
}
tool_files = {
name: f_input
for name, f_input in files.items()
if not is_auto_injected(f_input.content_type)
}
file_lines: list[str] = []
if auto_injected_files:
file_lines.append(
"Input files (content already loaded in conversation):"
)
for name, file_input in auto_injected_files.items():
filename = file_input.filename or name
file_lines.append(f' - "{name}" ({filename})')
if tool_files:
file_lines.append(
"Available input files (use the name in quotes with read_file tool):"
)
for name, file_input in tool_files.items():
filename = file_input.filename or name
content_type = file_input.content_type
file_lines.append(f' - "{name}" ({filename}, {content_type})')
if file_lines:
description += "\n\n" + "\n".join(file_lines)
tasks_slices = [description]
output = self.i18n.slice("expected_output").format(
@@ -948,6 +1029,18 @@ Follow these guidelines:
) from e
return
def _store_input_files(self) -> None:
"""Store task input files in the file store.
Converts input_files list to a named dict and stores under task ID.
"""
if not self.input_files:
return
files_dict = normalize_input_files(self.input_files)
if files_dict:
store_task_files(self.id, files_dict)
def __repr__(self) -> str:
return f"Task(description={self.description}, expected_output={self.expected_output})"