feat(files): standardize input_files parameter across all kickoff methods

This commit is contained in:
Greyson LaLonde
2026-01-23 09:08:19 -05:00
parent f21751ffb8
commit cd0a2c3900
6 changed files with 186 additions and 72 deletions

View File

@@ -1646,7 +1646,7 @@ class Agent(BaseAgent):
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
files: dict[str, FileInput] | None = None,
input_files: dict[str, FileInput] | None = None,
) -> tuple[AgentExecutor, dict[str, Any], dict[str, Any], list[CrewStructuredTool]]:
"""Prepare common setup for kickoff execution.
@@ -1657,7 +1657,7 @@ class Agent(BaseAgent):
Args:
messages: Either a string query or a list of message dictionaries.
response_format: Optional Pydantic model for structured output.
files: Optional dict of named files to attach to the message.
input_files: Optional dict of named files to attach to the message.
Returns:
Tuple of (executor, inputs, agent_info, parsed_tools) ready for execution.
@@ -1745,8 +1745,8 @@ class Agent(BaseAgent):
if msg.get("files"):
all_files.update(msg["files"])
if files:
all_files.update(files)
if input_files:
all_files.update(input_files)
# Build the input dict for the executor
inputs: dict[str, Any] = {
@@ -1763,10 +1763,9 @@ class Agent(BaseAgent):
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
files: dict[str, FileInput] | None = None,
input_files: dict[str, FileInput] | None = None,
) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]:
"""
Execute the agent with the given messages using the AgentExecutor.
"""Execute the agent with the given messages using the AgentExecutor.
This method provides standalone agent execution without requiring a Crew.
It supports tools, response formatting, guardrails, and file inputs.
@@ -1781,7 +1780,7 @@ class Agent(BaseAgent):
If a list is provided, each dict should have 'role' and 'content' keys.
Messages can include a 'files' field with file inputs.
response_format: Optional Pydantic model for structured output.
files: Optional dict of named files to attach to the message.
input_files: Optional dict of named files to attach to the message.
Files can be paths, bytes, or File objects from crewai_files.
Returns:
@@ -1794,10 +1793,10 @@ class Agent(BaseAgent):
# Magic auto-async: if inside event loop (e.g., inside a Flow),
# return coroutine for Flow to await
if is_inside_event_loop():
return self.kickoff_async(messages, response_format, files)
return self.kickoff_async(messages, response_format, input_files)
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format, files
messages, response_format, input_files
)
try:
@@ -2043,10 +2042,9 @@ class Agent(BaseAgent):
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
files: dict[str, FileInput] | None = None,
input_files: dict[str, FileInput] | None = None,
) -> LiteAgentOutput:
"""
Execute the agent asynchronously with the given messages.
"""Execute the agent asynchronously with the given messages.
This is the async version of the kickoff method that uses native async
execution. It is designed for use within async contexts, such as when
@@ -2058,14 +2056,14 @@ class Agent(BaseAgent):
If a list is provided, each dict should have 'role' and 'content' keys.
Messages can include a 'files' field with file inputs.
response_format: Optional Pydantic model for structured output.
files: Optional dict of named files to attach to the message.
input_files: Optional dict of named files to attach to the message.
Files can be paths, bytes, or File objects from crewai_files.
Returns:
LiteAgentOutput: The result of the agent execution.
"""
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format, files
messages, response_format, input_files
)
try:
@@ -2114,10 +2112,19 @@ class Agent(BaseAgent):
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
files: dict[str, FileInput] | None = None,
input_files: dict[str, FileInput] | None = None,
) -> LiteAgentOutput:
"""Async version of kickoff. Alias for kickoff_async."""
return await self.kickoff_async(messages, response_format, files)
"""Async version of kickoff. Alias for kickoff_async.
Args:
messages: Either a string query or a list of message dictionaries.
response_format: Optional Pydantic model for structured output.
input_files: Optional dict of named files to attach to the message.
Returns:
LiteAgentOutput: The result of the agent execution.
"""
return await self.kickoff_async(messages, response_format, input_files)
# Rebuild Agent model to resolve A2A type forward references

View File

@@ -109,7 +109,6 @@ from crewai.utilities.streaming import (
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.utilities.types import KickoffInputs
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
@@ -679,8 +678,18 @@ class Crew(FlowTrackable, BaseModel):
def kickoff(
self,
inputs: KickoffInputs | dict[str, Any] | None = None,
inputs: dict[str, Any] | None = None,
input_files: dict[str, Any] | None = None,
) -> CrewOutput | CrewStreamingOutput:
"""Execute the crew's workflow.
Args:
inputs: Optional input dictionary for task interpolation.
input_files: Optional dict of named file inputs for the crew.
Returns:
CrewOutput or CrewStreamingOutput if streaming is enabled.
"""
if self.stream:
enable_agent_streaming(self.agents)
ctx = StreamingContext()
@@ -689,7 +698,7 @@ class Crew(FlowTrackable, BaseModel):
"""Execute the crew and capture the result."""
try:
self.stream = False
crew_result = self.kickoff(inputs=inputs)
crew_result = self.kickoff(inputs=inputs, input_files=input_files)
if isinstance(crew_result, CrewOutput):
ctx.result_holder.append(crew_result)
except Exception as exc:
@@ -712,7 +721,7 @@ class Crew(FlowTrackable, BaseModel):
token = attach(baggage_ctx)
try:
inputs = prepare_kickoff(self, inputs)
inputs = prepare_kickoff(self, inputs, input_files)
if self.process == Process.sequential:
result = self._run_sequential_process()
@@ -740,10 +749,19 @@ class Crew(FlowTrackable, BaseModel):
detach(token)
def kickoff_for_each(
self, inputs: list[dict[str, Any]]
self,
inputs: list[dict[str, Any]],
input_files: dict[str, Any] | None = None,
) -> list[CrewOutput | CrewStreamingOutput]:
"""Executes the Crew's workflow for each input and aggregates results.
Args:
inputs: List of input dictionaries, one per execution.
input_files: Optional dict of named file inputs shared across all executions.
Returns:
List of CrewOutput or CrewStreamingOutput objects.
If stream=True, returns a list of CrewStreamingOutput objects that must
each be iterated to get stream chunks and access results.
"""
@@ -754,7 +772,7 @@ class Crew(FlowTrackable, BaseModel):
for input_data in inputs:
crew = self.copy()
output = crew.kickoff(inputs=input_data)
output = crew.kickoff(inputs=input_data, input_files=input_files)
if not self.stream and crew.usage_metrics:
total_usage_metrics.add_usage_metrics(crew.usage_metrics)
@@ -767,10 +785,19 @@ class Crew(FlowTrackable, BaseModel):
return results
async def kickoff_async(
self, inputs: KickoffInputs | dict[str, Any] | None = None
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, Any] | None = None,
) -> CrewOutput | CrewStreamingOutput:
"""Asynchronous kickoff method to start the crew execution.
Args:
inputs: Optional input dictionary for task interpolation.
input_files: Optional dict of named file inputs for the crew.
Returns:
CrewOutput or CrewStreamingOutput if streaming is enabled.
If stream=True, returns a CrewStreamingOutput that can be async-iterated
to get stream chunks. After iteration completes, access the final result
via .result.
@@ -784,7 +811,7 @@ class Crew(FlowTrackable, BaseModel):
async def run_crew() -> None:
try:
self.stream = False
result = await asyncio.to_thread(self.kickoff, inputs)
result = await asyncio.to_thread(self.kickoff, inputs, input_files)
if isinstance(result, CrewOutput):
ctx.result_holder.append(result)
except Exception as e:
@@ -802,13 +829,22 @@ class Crew(FlowTrackable, BaseModel):
return streaming_output
return await asyncio.to_thread(self.kickoff, inputs)
return await asyncio.to_thread(self.kickoff, inputs, input_files)
async def kickoff_for_each_async(
self, inputs: list[dict[str, Any]]
self,
inputs: list[dict[str, Any]],
input_files: dict[str, Any] | None = None,
) -> list[CrewOutput | CrewStreamingOutput] | CrewStreamingOutput:
"""Executes the Crew's workflow for each input asynchronously.
Args:
inputs: List of input dictionaries, one per execution.
input_files: Optional dict of named file inputs shared across all executions.
Returns:
List of CrewOutput or CrewStreamingOutput objects.
If stream=True, returns a single CrewStreamingOutput that yields chunks
from all crews as they arrive. After iteration, access results via .results
(list of CrewOutput).
@@ -817,18 +853,27 @@ class Crew(FlowTrackable, BaseModel):
async def kickoff_fn(
crew: Crew, input_data: dict[str, Any]
) -> CrewOutput | CrewStreamingOutput:
return await crew.kickoff_async(inputs=input_data)
return await crew.kickoff_async(inputs=input_data, input_files=input_files)
return await run_for_each_async(self, inputs, kickoff_fn)
async def akickoff(
self, inputs: KickoffInputs | dict[str, Any] | None = None
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, Any] | None = None,
) -> CrewOutput | CrewStreamingOutput:
"""Native async kickoff method using async task execution throughout.
Unlike kickoff_async which wraps sync kickoff in a thread, this method
uses native async/await for all operations including task execution,
memory operations, and knowledge queries.
Args:
inputs: Optional input dictionary for task interpolation.
input_files: Optional dict of named file inputs for the crew.
Returns:
CrewOutput or CrewStreamingOutput if streaming is enabled.
"""
if self.stream:
enable_agent_streaming(self.agents)
@@ -837,7 +882,7 @@ class Crew(FlowTrackable, BaseModel):
async def run_crew() -> None:
try:
self.stream = False
inner_result = await self.akickoff(inputs)
inner_result = await self.akickoff(inputs, input_files)
if isinstance(inner_result, CrewOutput):
ctx.result_holder.append(inner_result)
except Exception as exc:
@@ -861,7 +906,7 @@ class Crew(FlowTrackable, BaseModel):
token = attach(baggage_ctx)
try:
inputs = prepare_kickoff(self, inputs)
inputs = prepare_kickoff(self, inputs, input_files)
if self.process == Process.sequential:
result = await self._arun_sequential_process()
@@ -889,11 +934,21 @@ class Crew(FlowTrackable, BaseModel):
detach(token)
async def akickoff_for_each(
self, inputs: list[dict[str, Any]]
self,
inputs: list[dict[str, Any]],
input_files: dict[str, Any] | None = None,
) -> list[CrewOutput | CrewStreamingOutput] | CrewStreamingOutput:
"""Native async execution of the Crew's workflow for each input.
Uses native async throughout rather than thread-based async.
Args:
inputs: List of input dictionaries, one per execution.
input_files: Optional dict of named file inputs shared across all executions.
Returns:
List of CrewOutput or CrewStreamingOutput objects.
If stream=True, returns a single CrewStreamingOutput that yields chunks
from all crews as they arrive.
"""
@@ -901,7 +956,7 @@ class Crew(FlowTrackable, BaseModel):
async def kickoff_fn(
crew: Crew, input_data: dict[str, Any]
) -> CrewOutput | CrewStreamingOutput:
return await crew.akickoff(inputs=input_data)
return await crew.akickoff(inputs=input_data, input_files=input_files)
return await run_for_each_async(self, inputs, kickoff_fn)

View File

@@ -6,6 +6,8 @@ import asyncio
from collections.abc import Callable, Coroutine, Iterable, Mapping
from typing import TYPE_CHECKING, Any
from opentelemetry import baggage
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.crews.crew_output import CrewOutput
from crewai.rag.embeddings.types import EmbedderConfig
@@ -16,7 +18,6 @@ from crewai.utilities.streaming import (
TaskInfo,
create_streaming_state,
)
from crewai.utilities.types import KickoffInputs
try:
@@ -222,7 +223,9 @@ def _extract_files_from_inputs(inputs: dict[str, Any]) -> dict[str, Any]:
def prepare_kickoff(
crew: Crew, inputs: KickoffInputs | dict[str, Any] | None
crew: Crew,
inputs: dict[str, Any] | None,
input_files: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
"""Prepare crew for kickoff execution.
@@ -232,6 +235,7 @@ def prepare_kickoff(
Args:
crew: The crew instance to prepare.
inputs: Optional input dictionary to pass to the crew.
input_files: Optional dict of named file inputs for the crew.
Returns:
The potentially modified inputs dictionary after before callbacks.
@@ -272,20 +276,26 @@ def prepare_kickoff(
crew._task_output_handler.reset()
crew._logging_color = "bold_purple"
if normalized is not None:
# Extract files from dedicated "files" key
files = normalized.pop("files", None) or {}
# Check for flow input files in baggage context (inherited from parent Flow)
_flow_files = baggage.get_baggage("flow_input_files")
flow_files: dict[str, Any] = _flow_files if isinstance(_flow_files, dict) else {}
if normalized is not None:
# Extract file objects unpacked directly into inputs
unpacked_files = _extract_files_from_inputs(normalized)
# Merge files (unpacked files take precedence over explicit files dict)
all_files = {**files, **unpacked_files}
# Merge files: flow_files < input_files < unpacked_files (later takes precedence)
all_files = {**flow_files, **(input_files or {}), **unpacked_files}
if all_files:
store_files(crew.id, all_files)
crew._inputs = normalized
crew._interpolate_inputs(normalized)
else:
# No inputs dict provided
all_files = {**flow_files, **(input_files or {})}
if all_files:
store_files(crew.id, all_files)
crew._set_tasks_callbacks()
crew._set_allow_crewai_trigger_context_for_first_task()

View File

@@ -1412,13 +1412,21 @@ class Flow(Generic[T], metaclass=FlowMeta):
object.__setattr__(self._state, key, value)
def kickoff(
self, inputs: dict[str, Any] | None = None
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, Any] | None = None,
) -> Any | FlowStreamingOutput:
"""
Start the flow execution in a synchronous context.
"""Start the flow execution in a synchronous context.
This method wraps kickoff_async so that all state initialization and event
emission is handled in the asynchronous method.
Args:
inputs: Optional dictionary containing input values and/or a state ID.
input_files: Optional dict of named file inputs for the flow.
Returns:
The final output from the flow or FlowStreamingOutput if streaming.
"""
if self.stream:
result_holder: list[Any] = []
@@ -1438,7 +1446,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
def run_flow() -> None:
try:
self.stream = False
result = self.kickoff(inputs=inputs)
result = self.kickoff(inputs=inputs, input_files=input_files)
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
@@ -1460,15 +1468,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
return streaming_output
async def _run_flow() -> Any:
return await self.kickoff_async(inputs)
return await self.kickoff_async(inputs, input_files)
return asyncio.run(_run_flow())
async def kickoff_async(
self, inputs: dict[str, Any] | None = None
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, Any] | None = None,
) -> Any | FlowStreamingOutput:
"""
Start the flow execution asynchronously.
"""Start the flow execution asynchronously.
This method performs state restoration (if an 'id' is provided and persistence is available)
and updates the flow state with any additional inputs. It then emits the FlowStartedEvent,
@@ -1477,6 +1486,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
Args:
inputs: Optional dictionary containing input values and/or a state ID for restoration.
input_files: Optional dict of named file inputs for the flow.
Returns:
The final output from the flow, which is the result of the last executed method.
@@ -1499,7 +1509,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
async def run_flow() -> None:
try:
self.stream = False
result = await self.kickoff_async(inputs=inputs)
result = await self.kickoff_async(
inputs=inputs, input_files=input_files
)
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
@@ -1523,6 +1535,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return streaming_output
ctx = baggage.set_baggage("flow_inputs", inputs or {})
ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx)
flow_token = attach(ctx)
try:
@@ -1705,18 +1718,20 @@ class Flow(Generic[T], metaclass=FlowMeta):
detach(flow_token)
async def akickoff(
self, inputs: dict[str, Any] | None = None
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, Any] | None = None,
) -> Any | FlowStreamingOutput:
"""Native async method to start the flow execution. Alias for kickoff_async.
Args:
inputs: Optional dictionary containing input values and/or a state ID for restoration.
input_files: Optional dict of named file inputs for the flow.
Returns:
The final output from the flow, which is the result of the last executed method.
"""
return await self.kickoff_async(inputs)
return await self.kickoff_async(inputs, input_files)
async def _execute_start_method(self, start_method_name: FlowMethodName) -> None:
"""Executes a flow's start method and its triggered listeners.

View File

@@ -296,9 +296,9 @@ class LiteAgent(FlowTrackable, BaseModel):
self,
messages: str | list[LLMMessage],
response_format: type[BaseModel] | None = None,
input_files: dict[str, Any] | None = None,
) -> LiteAgentOutput:
"""
Execute the agent with the given messages.
"""Execute the agent with the given messages.
Args:
messages: Either a string query or a list of message dictionaries.
@@ -306,6 +306,8 @@ class LiteAgent(FlowTrackable, BaseModel):
If a list is provided, each dict should have 'role' and 'content' keys.
response_format: Optional Pydantic model for structured output. If provided,
overrides self.response_format for this execution.
input_files: Optional dict of named files to attach to the message.
Files can be paths, bytes, or File objects from crewai_files.
Returns:
LiteAgentOutput: The result of the agent execution.
@@ -327,7 +329,7 @@ class LiteAgent(FlowTrackable, BaseModel):
# Format messages for the LLM
self._messages = self._format_messages(
messages, response_format=response_format
messages, response_format=response_format, input_files=input_files
)
return self._execute_core(
@@ -464,19 +466,45 @@ class LiteAgent(FlowTrackable, BaseModel):
return output
async def kickoff_async(self, messages: str | list[LLMMessage]) -> LiteAgentOutput:
"""
Execute the agent asynchronously with the given messages.
async def kickoff_async(
self,
messages: str | list[LLMMessage],
response_format: type[BaseModel] | None = None,
input_files: dict[str, Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent asynchronously with the given messages.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
response_format: Optional Pydantic model for structured output.
input_files: Optional dict of named files to attach to the message.
Returns:
LiteAgentOutput: The result of the agent execution.
"""
return await asyncio.to_thread(self.kickoff, messages)
return await asyncio.to_thread(
self.kickoff, messages, response_format, input_files
)
async def akickoff(
self,
messages: str | list[LLMMessage],
response_format: type[BaseModel] | None = None,
input_files: dict[str, Any] | None = None,
) -> LiteAgentOutput:
"""Async version of kickoff. Alias for kickoff_async.
Args:
messages: Either a string query or a list of message dictionaries.
response_format: Optional Pydantic model for structured output.
input_files: Optional dict of named files to attach to the message.
Returns:
LiteAgentOutput: The result of the agent execution.
"""
return await self.kickoff_async(messages, response_format, input_files)
def _get_default_system_prompt(
self, response_format: type[BaseModel] | None = None
@@ -520,12 +548,14 @@ class LiteAgent(FlowTrackable, BaseModel):
self,
messages: str | list[LLMMessage],
response_format: type[BaseModel] | None = None,
input_files: dict[str, Any] | None = None,
) -> list[LLMMessage]:
"""Format messages for the LLM.
Args:
messages: Input messages to format
response_format: Optional response format to use instead of self.response_format
messages: Input messages to format.
response_format: Optional response format to use instead of self.response_format.
input_files: Optional dict of named files to include with the messages.
"""
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
@@ -540,6 +570,13 @@ class LiteAgent(FlowTrackable, BaseModel):
# Add the rest of the messages
formatted_messages.extend(messages)
# Attach files to the last user message if provided
if input_files:
for msg in reversed(formatted_messages):
if msg.get("role") == "user":
msg["files"] = input_files
break
return formatted_messages
def _invoke_loop(self) -> AgentFinish:

View File

@@ -27,13 +27,3 @@ class LLMMessage(TypedDict):
name: NotRequired[str]
tool_calls: NotRequired[list[dict[str, Any]]]
files: NotRequired[dict[str, FileInput]]
class KickoffInputs(TypedDict, total=False):
"""Type for crew kickoff inputs.
Attributes:
files: Named file inputs accessible to tasks during execution.
"""
files: dict[str, FileInput]