diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index de19934d6..f1eeef646 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -43,6 +43,8 @@ from crewai.utilities.agent_utils import ( process_llm_response, ) from crewai.utilities.constants import TRAINING_DATA_FILE +from crewai.utilities.file_store import get_all_files +from crewai.utilities.files import FileProcessor from crewai.utilities.i18n import I18N, get_i18n from crewai.utilities.printer import Printer from crewai.utilities.tool_utils import ( @@ -188,6 +190,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs) self.messages.append(format_message_for_llm(user_prompt)) + self._inject_multimodal_files() + self._show_start_logs() self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False)) @@ -212,6 +216,48 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self._create_external_memory(formatted_answer) return {"output": formatted_answer.output} + def _inject_multimodal_files(self) -> None: + """Inject files as multimodal content into messages. + + For crews with input files and LLMs that support multimodal, + processes files according to provider constraints and file handling mode, + then delegates to the LLM's format_multimodal_content method to + generate provider-specific content blocks. + """ + if not self.crew or not self.task: + return + + if not self.llm.supports_multimodal(): + return + + files = get_all_files(self.crew.id, self.task.id) + if not files: + return + + provider = getattr(self.llm, "provider", None) or getattr(self.llm, "model", "") + processor = FileProcessor(constraints=provider) + files = processor.process_files(files) + + from crewai.utilities.files import get_upload_cache + + upload_cache = get_upload_cache() + content_blocks = self.llm.format_multimodal_content( + files, upload_cache=upload_cache + ) + if not content_blocks: + return + + for i in range(len(self.messages) - 1, -1, -1): + msg = self.messages[i] + if msg.get("role") == "user": + existing_content = msg.get("content", "") + if isinstance(existing_content, str): + msg["content"] = [ + self.llm.format_text_content(existing_content), + *content_blocks, + ] + break + def _invoke_loop(self) -> AgentFinish: """Execute agent loop until completion. @@ -355,6 +401,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs) self.messages.append(format_message_for_llm(user_prompt)) + self._inject_multimodal_files() + self._show_start_logs() self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False)) diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 2c7f583b9..4f8a46a44 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -80,6 +80,7 @@ from crewai.task import Task from crewai.tasks.conditional_task import ConditionalTask from crewai.tasks.task_output import TaskOutput from crewai.tools.agent_tools.agent_tools import AgentTools +from crewai.tools.agent_tools.read_file_tool import ReadFileTool from crewai.tools.base_tool import BaseTool from crewai.types.streaming import CrewStreamingOutput from crewai.types.usage_metrics import UsageMetrics @@ -88,6 +89,7 @@ from crewai.utilities.crew.models import CrewContext from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.file_handler import FileHandler +from crewai.utilities.file_store import clear_files, get_all_files from crewai.utilities.formatter import ( aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_tasks, @@ -106,6 +108,7 @@ from crewai.utilities.streaming import ( ) from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler from crewai.utilities.training_handler import CrewTrainingHandler +from crewai.utilities.types import KickoffInputs warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd") @@ -675,7 +678,7 @@ class Crew(FlowTrackable, BaseModel): def kickoff( self, - inputs: dict[str, Any] | None = None, + inputs: KickoffInputs | dict[str, Any] | None = None, ) -> CrewOutput | CrewStreamingOutput: if self.stream: enable_agent_streaming(self.agents) @@ -732,6 +735,7 @@ class Crew(FlowTrackable, BaseModel): ) raise finally: + clear_files(self.id) detach(token) def kickoff_for_each( @@ -762,7 +766,7 @@ class Crew(FlowTrackable, BaseModel): return results async def kickoff_async( - self, inputs: dict[str, Any] | None = None + self, inputs: KickoffInputs | dict[str, Any] | None = None ) -> CrewOutput | CrewStreamingOutput: """Asynchronous kickoff method to start the crew execution. @@ -817,7 +821,7 @@ class Crew(FlowTrackable, BaseModel): return await run_for_each_async(self, inputs, kickoff_fn) async def akickoff( - self, inputs: dict[str, Any] | None = None + self, inputs: KickoffInputs | dict[str, Any] | None = None ) -> CrewOutput | CrewStreamingOutput: """Native async kickoff method using async task execution throughout. @@ -880,6 +884,7 @@ class Crew(FlowTrackable, BaseModel): ) raise finally: + clear_files(self.id) detach(token) async def akickoff_for_each( @@ -1215,7 +1220,8 @@ class Crew(FlowTrackable, BaseModel): and hasattr(agent, "multimodal") and getattr(agent, "multimodal", False) ): - tools = self._add_multimodal_tools(agent, tools) + if not (agent.llm and agent.llm.supports_multimodal()): + tools = self._add_multimodal_tools(agent, tools) if agent and (hasattr(agent, "apps") and getattr(agent, "apps", None)): tools = self._add_platform_tools(task, tools) @@ -1223,7 +1229,24 @@ class Crew(FlowTrackable, BaseModel): if agent and (hasattr(agent, "mcps") and getattr(agent, "mcps", None)): tools = self._add_mcp_tools(task, tools) - # Return a list[BaseTool] compatible with Task.execute_sync and execute_async + files = get_all_files(self.id, task.id) + if files: + supported_types: list[str] = [] + if agent and agent.llm and agent.llm.supports_multimodal(): + supported_types = agent.llm.supported_multimodal_content_types() + + def is_auto_injected(content_type: str) -> bool: + return any(content_type.startswith(t) for t in supported_types) + + # Only add read_file tool if there are files that need it + files_needing_tool = { + name: f + for name, f in files.items() + if not is_auto_injected(f.content_type) + } + if files_needing_tool: + tools = self._add_file_tools(tools, files_needing_tool) + return tools def _get_agent_to_use(self, task: Task) -> BaseAgent | None: @@ -1303,6 +1326,22 @@ class Crew(FlowTrackable, BaseModel): return self._merge_tools(tools, cast(list[BaseTool], code_tools)) return tools + def _add_file_tools( + self, tools: list[BaseTool], files: dict[str, Any] + ) -> list[BaseTool]: + """Add file reading tool when input files are available. + + Args: + tools: Current list of tools. + files: Dictionary of input files. + + Returns: + Updated list with file tool added. + """ + read_file_tool = ReadFileTool() + read_file_tool.set_files(files) + return self._merge_tools(tools, [read_file_tool]) + def _add_delegation_tools( self, task: Task, tools: list[BaseTool] ) -> list[BaseTool]: diff --git a/lib/crewai/src/crewai/crews/utils.py b/lib/crewai/src/crewai/crews/utils.py index 5694dcda1..ee1df151b 100644 --- a/lib/crewai/src/crewai/crews/utils.py +++ b/lib/crewai/src/crewai/crews/utils.py @@ -10,11 +10,20 @@ from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.crews.crew_output import CrewOutput from crewai.rag.embeddings.types import EmbedderConfig from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput +from crewai.utilities.file_store import store_files +from crewai.utilities.files import ( + AudioFile, + ImageFile, + PDFFile, + TextFile, + VideoFile, +) from crewai.utilities.streaming import ( StreamingState, TaskInfo, create_streaming_state, ) +from crewai.utilities.types import KickoffInputs if TYPE_CHECKING: @@ -176,7 +185,36 @@ def check_conditional_skip( return None -def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any] | None: +def _extract_files_from_inputs(inputs: dict[str, Any]) -> dict[str, Any]: + """Extract file objects from inputs dict. + + Scans inputs for FileInput objects (ImageFile, TextFile, etc.) and + extracts them into a separate dict. + + Args: + inputs: The inputs dictionary to scan. + + Returns: + Dictionary of extracted file objects. + """ + file_types = (AudioFile, ImageFile, PDFFile, TextFile, VideoFile) + files: dict[str, Any] = {} + keys_to_remove: list[str] = [] + + for key, value in inputs.items(): + if isinstance(value, file_types): + files[key] = value + keys_to_remove.append(key) + + for key in keys_to_remove: + del inputs[key] + + return files + + +def prepare_kickoff( + crew: Crew, inputs: KickoffInputs | dict[str, Any] | None +) -> dict[str, Any] | None: """Prepare crew for kickoff execution. Handles before callbacks, event emission, task handler reset, input @@ -192,14 +230,17 @@ def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any] from crewai.events.event_bus import crewai_event_bus from crewai.events.types.crew_events import CrewKickoffStartedEvent + # Normalize inputs to dict[str, Any] for internal processing + normalized: dict[str, Any] | None = dict(inputs) if inputs is not None else None + for before_callback in crew.before_kickoff_callbacks: - if inputs is None: - inputs = {} - inputs = before_callback(inputs) + if normalized is None: + normalized = {} + normalized = before_callback(normalized) future = crewai_event_bus.emit( crew, - CrewKickoffStartedEvent(crew_name=crew.name, inputs=inputs), + CrewKickoffStartedEvent(crew_name=crew.name, inputs=normalized), ) if future is not None: try: @@ -210,9 +251,20 @@ def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any] crew._task_output_handler.reset() crew._logging_color = "bold_purple" - if inputs is not None: - crew._inputs = inputs - crew._interpolate_inputs(inputs) + if normalized is not None: + # Extract files from dedicated "files" key + files = normalized.pop("files", None) or {} + + # Extract file objects unpacked directly into inputs + unpacked_files = _extract_files_from_inputs(normalized) + + # Merge files (unpacked files take precedence over explicit files dict) + all_files = {**files, **unpacked_files} + if all_files: + store_files(crew.id, all_files) + + crew._inputs = normalized + crew._interpolate_inputs(normalized) crew._set_tasks_callbacks() crew._set_allow_crewai_trigger_context_for_first_task() @@ -227,7 +279,7 @@ def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any] if crew.planning: crew._handle_crew_planning() - return inputs + return normalized class StreamingContext: diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index 13d30b564..7d8fc5ecb 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -44,6 +44,17 @@ from crewai.tools.base_tool import BaseTool from crewai.utilities.config import process_config from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified from crewai.utilities.converter import Converter, convert_to_model +from crewai.utilities.file_store import ( + clear_task_files, + get_all_files, + store_task_files, +) +from crewai.utilities.files import ( + FileInput, + FilePath, + FileSourceInput, + normalize_input_files, +) from crewai.utilities.guardrail import ( process_guardrail, ) @@ -142,6 +153,10 @@ class Task(BaseModel): default_factory=list, description="Tools the agent is limited to use for this task.", ) + input_files: list[FileSourceInput | FileInput] = Field( + default_factory=list, + description="List of input files for this task. Accepts paths, bytes, or File objects.", + ) security_config: SecurityConfig = Field( default_factory=SecurityConfig, description="Security configuration for the task.", @@ -357,6 +372,21 @@ class Task(BaseModel): "may_not_set_field", "This field is not to be set by the user.", {} ) + @field_validator("input_files", mode="before") + @classmethod + def _normalize_input_files(cls, v: list[Any]) -> list[Any]: + """Convert string paths to FilePath objects.""" + if not v: + return v + + result = [] + for item in v: + if isinstance(item, str): + result.append(FilePath(path=Path(item))) + else: + result.append(item) + return result + @field_validator("output_file") @classmethod def output_file_validation(cls, value: str | None) -> str | None: @@ -495,10 +525,10 @@ class Task(BaseModel): ) -> None: """Execute the task asynchronously with context handling.""" try: - result = self._execute_core(agent, context, tools) - future.set_result(result) + result = self._execute_core(agent, context, tools) + future.set_result(result) except Exception as e: - future.set_exception(e) + future.set_exception(e) async def aexecute_sync( self, @@ -516,6 +546,7 @@ class Task(BaseModel): tools: list[Any] | None, ) -> TaskOutput: """Run the core execution logic of the task asynchronously.""" + self._store_input_files() try: agent = agent or self.agent self.agent = agent @@ -600,6 +631,8 @@ class Task(BaseModel): self.end_time = datetime.datetime.now() crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self)) # type: ignore[no-untyped-call] raise e # Re-raise the exception after emitting the event + finally: + clear_task_files(self.id) def _execute_core( self, @@ -608,6 +641,7 @@ class Task(BaseModel): tools: list[Any] | None, ) -> TaskOutput: """Run the core execution logic of the task.""" + self._store_input_files() try: agent = agent or self.agent self.agent = agent @@ -693,6 +727,8 @@ class Task(BaseModel): self.end_time = datetime.datetime.now() crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self)) # type: ignore[no-untyped-call] raise e # Re-raise the exception after emitting the event + finally: + clear_task_files(self.id) def prompt(self) -> str: """Generates the task prompt with optional markdown formatting. @@ -715,6 +751,51 @@ class Task(BaseModel): if trigger_payload is not None: description += f"\n\nTrigger Payload: {trigger_payload}" + if self.agent and self.agent.crew: + files = get_all_files(self.agent.crew.id, self.id) + if files: + supported_types: list[str] = [] + if self.agent.llm and self.agent.llm.supports_multimodal(): + supported_types = ( + self.agent.llm.supported_multimodal_content_types() + ) + + def is_auto_injected(content_type: str) -> bool: + return any(content_type.startswith(t) for t in supported_types) + + auto_injected_files = { + name: f_input + for name, f_input in files.items() + if is_auto_injected(f_input.content_type) + } + tool_files = { + name: f_input + for name, f_input in files.items() + if not is_auto_injected(f_input.content_type) + } + + file_lines: list[str] = [] + + if auto_injected_files: + file_lines.append( + "Input files (content already loaded in conversation):" + ) + for name, file_input in auto_injected_files.items(): + filename = file_input.filename or name + file_lines.append(f' - "{name}" ({filename})') + + if tool_files: + file_lines.append( + "Available input files (use the name in quotes with read_file tool):" + ) + for name, file_input in tool_files.items(): + filename = file_input.filename or name + content_type = file_input.content_type + file_lines.append(f' - "{name}" ({filename}, {content_type})') + + if file_lines: + description += "\n\n" + "\n".join(file_lines) + tasks_slices = [description] output = self.i18n.slice("expected_output").format( @@ -948,6 +1029,18 @@ Follow these guidelines: ) from e return + def _store_input_files(self) -> None: + """Store task input files in the file store. + + Converts input_files list to a named dict and stores under task ID. + """ + if not self.input_files: + return + + files_dict = normalize_input_files(self.input_files) + if files_dict: + store_task_files(self.id, files_dict) + def __repr__(self) -> str: return f"Task(description={self.description}, expected_output={self.expected_output})"