diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 47902013d..8c414cd67 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -1646,7 +1646,7 @@ class Agent(BaseAgent): self, messages: str | list[LLMMessage], response_format: type[Any] | None = None, - files: dict[str, FileInput] | None = None, + input_files: dict[str, FileInput] | None = None, ) -> tuple[AgentExecutor, dict[str, Any], dict[str, Any], list[CrewStructuredTool]]: """Prepare common setup for kickoff execution. @@ -1657,7 +1657,7 @@ class Agent(BaseAgent): Args: messages: Either a string query or a list of message dictionaries. response_format: Optional Pydantic model for structured output. - files: Optional dict of named files to attach to the message. + input_files: Optional dict of named files to attach to the message. Returns: Tuple of (executor, inputs, agent_info, parsed_tools) ready for execution. @@ -1745,8 +1745,8 @@ class Agent(BaseAgent): if msg.get("files"): all_files.update(msg["files"]) - if files: - all_files.update(files) + if input_files: + all_files.update(input_files) # Build the input dict for the executor inputs: dict[str, Any] = { @@ -1763,10 +1763,9 @@ class Agent(BaseAgent): self, messages: str | list[LLMMessage], response_format: type[Any] | None = None, - files: dict[str, FileInput] | None = None, + input_files: dict[str, FileInput] | None = None, ) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]: - """ - Execute the agent with the given messages using the AgentExecutor. + """Execute the agent with the given messages using the AgentExecutor. This method provides standalone agent execution without requiring a Crew. It supports tools, response formatting, guardrails, and file inputs. @@ -1781,7 +1780,7 @@ class Agent(BaseAgent): If a list is provided, each dict should have 'role' and 'content' keys. Messages can include a 'files' field with file inputs. response_format: Optional Pydantic model for structured output. - files: Optional dict of named files to attach to the message. + input_files: Optional dict of named files to attach to the message. Files can be paths, bytes, or File objects from crewai_files. Returns: @@ -1794,10 +1793,10 @@ class Agent(BaseAgent): # Magic auto-async: if inside event loop (e.g., inside a Flow), # return coroutine for Flow to await if is_inside_event_loop(): - return self.kickoff_async(messages, response_format, files) + return self.kickoff_async(messages, response_format, input_files) executor, inputs, agent_info, parsed_tools = self._prepare_kickoff( - messages, response_format, files + messages, response_format, input_files ) try: @@ -2043,10 +2042,9 @@ class Agent(BaseAgent): self, messages: str | list[LLMMessage], response_format: type[Any] | None = None, - files: dict[str, FileInput] | None = None, + input_files: dict[str, FileInput] | None = None, ) -> LiteAgentOutput: - """ - Execute the agent asynchronously with the given messages. + """Execute the agent asynchronously with the given messages. This is the async version of the kickoff method that uses native async execution. It is designed for use within async contexts, such as when @@ -2058,14 +2056,14 @@ class Agent(BaseAgent): If a list is provided, each dict should have 'role' and 'content' keys. Messages can include a 'files' field with file inputs. response_format: Optional Pydantic model for structured output. - files: Optional dict of named files to attach to the message. + input_files: Optional dict of named files to attach to the message. Files can be paths, bytes, or File objects from crewai_files. Returns: LiteAgentOutput: The result of the agent execution. """ executor, inputs, agent_info, parsed_tools = self._prepare_kickoff( - messages, response_format, files + messages, response_format, input_files ) try: @@ -2114,10 +2112,19 @@ class Agent(BaseAgent): self, messages: str | list[LLMMessage], response_format: type[Any] | None = None, - files: dict[str, FileInput] | None = None, + input_files: dict[str, FileInput] | None = None, ) -> LiteAgentOutput: - """Async version of kickoff. Alias for kickoff_async.""" - return await self.kickoff_async(messages, response_format, files) + """Async version of kickoff. Alias for kickoff_async. + + Args: + messages: Either a string query or a list of message dictionaries. + response_format: Optional Pydantic model for structured output. + input_files: Optional dict of named files to attach to the message. + + Returns: + LiteAgentOutput: The result of the agent execution. + """ + return await self.kickoff_async(messages, response_format, input_files) # Rebuild Agent model to resolve A2A type forward references diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 960b8c10e..e7de9c0d3 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -109,7 +109,6 @@ from crewai.utilities.streaming import ( from crewai.utilities.string_utils import sanitize_tool_name from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler from crewai.utilities.training_handler import CrewTrainingHandler -from crewai.utilities.types import KickoffInputs warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd") @@ -679,8 +678,18 @@ class Crew(FlowTrackable, BaseModel): def kickoff( self, - inputs: KickoffInputs | dict[str, Any] | None = None, + inputs: dict[str, Any] | None = None, + input_files: dict[str, Any] | None = None, ) -> CrewOutput | CrewStreamingOutput: + """Execute the crew's workflow. + + Args: + inputs: Optional input dictionary for task interpolation. + input_files: Optional dict of named file inputs for the crew. + + Returns: + CrewOutput or CrewStreamingOutput if streaming is enabled. + """ if self.stream: enable_agent_streaming(self.agents) ctx = StreamingContext() @@ -689,7 +698,7 @@ class Crew(FlowTrackable, BaseModel): """Execute the crew and capture the result.""" try: self.stream = False - crew_result = self.kickoff(inputs=inputs) + crew_result = self.kickoff(inputs=inputs, input_files=input_files) if isinstance(crew_result, CrewOutput): ctx.result_holder.append(crew_result) except Exception as exc: @@ -712,7 +721,7 @@ class Crew(FlowTrackable, BaseModel): token = attach(baggage_ctx) try: - inputs = prepare_kickoff(self, inputs) + inputs = prepare_kickoff(self, inputs, input_files) if self.process == Process.sequential: result = self._run_sequential_process() @@ -740,10 +749,19 @@ class Crew(FlowTrackable, BaseModel): detach(token) def kickoff_for_each( - self, inputs: list[dict[str, Any]] + self, + inputs: list[dict[str, Any]], + input_files: dict[str, Any] | None = None, ) -> list[CrewOutput | CrewStreamingOutput]: """Executes the Crew's workflow for each input and aggregates results. + Args: + inputs: List of input dictionaries, one per execution. + input_files: Optional dict of named file inputs shared across all executions. + + Returns: + List of CrewOutput or CrewStreamingOutput objects. + If stream=True, returns a list of CrewStreamingOutput objects that must each be iterated to get stream chunks and access results. """ @@ -754,7 +772,7 @@ class Crew(FlowTrackable, BaseModel): for input_data in inputs: crew = self.copy() - output = crew.kickoff(inputs=input_data) + output = crew.kickoff(inputs=input_data, input_files=input_files) if not self.stream and crew.usage_metrics: total_usage_metrics.add_usage_metrics(crew.usage_metrics) @@ -767,10 +785,19 @@ class Crew(FlowTrackable, BaseModel): return results async def kickoff_async( - self, inputs: KickoffInputs | dict[str, Any] | None = None + self, + inputs: dict[str, Any] | None = None, + input_files: dict[str, Any] | None = None, ) -> CrewOutput | CrewStreamingOutput: """Asynchronous kickoff method to start the crew execution. + Args: + inputs: Optional input dictionary for task interpolation. + input_files: Optional dict of named file inputs for the crew. + + Returns: + CrewOutput or CrewStreamingOutput if streaming is enabled. + If stream=True, returns a CrewStreamingOutput that can be async-iterated to get stream chunks. After iteration completes, access the final result via .result. @@ -784,7 +811,7 @@ class Crew(FlowTrackable, BaseModel): async def run_crew() -> None: try: self.stream = False - result = await asyncio.to_thread(self.kickoff, inputs) + result = await asyncio.to_thread(self.kickoff, inputs, input_files) if isinstance(result, CrewOutput): ctx.result_holder.append(result) except Exception as e: @@ -802,13 +829,22 @@ class Crew(FlowTrackable, BaseModel): return streaming_output - return await asyncio.to_thread(self.kickoff, inputs) + return await asyncio.to_thread(self.kickoff, inputs, input_files) async def kickoff_for_each_async( - self, inputs: list[dict[str, Any]] + self, + inputs: list[dict[str, Any]], + input_files: dict[str, Any] | None = None, ) -> list[CrewOutput | CrewStreamingOutput] | CrewStreamingOutput: """Executes the Crew's workflow for each input asynchronously. + Args: + inputs: List of input dictionaries, one per execution. + input_files: Optional dict of named file inputs shared across all executions. + + Returns: + List of CrewOutput or CrewStreamingOutput objects. + If stream=True, returns a single CrewStreamingOutput that yields chunks from all crews as they arrive. After iteration, access results via .results (list of CrewOutput). @@ -817,18 +853,27 @@ class Crew(FlowTrackable, BaseModel): async def kickoff_fn( crew: Crew, input_data: dict[str, Any] ) -> CrewOutput | CrewStreamingOutput: - return await crew.kickoff_async(inputs=input_data) + return await crew.kickoff_async(inputs=input_data, input_files=input_files) return await run_for_each_async(self, inputs, kickoff_fn) async def akickoff( - self, inputs: KickoffInputs | dict[str, Any] | None = None + self, + inputs: dict[str, Any] | None = None, + input_files: dict[str, Any] | None = None, ) -> CrewOutput | CrewStreamingOutput: """Native async kickoff method using async task execution throughout. Unlike kickoff_async which wraps sync kickoff in a thread, this method uses native async/await for all operations including task execution, memory operations, and knowledge queries. + + Args: + inputs: Optional input dictionary for task interpolation. + input_files: Optional dict of named file inputs for the crew. + + Returns: + CrewOutput or CrewStreamingOutput if streaming is enabled. """ if self.stream: enable_agent_streaming(self.agents) @@ -837,7 +882,7 @@ class Crew(FlowTrackable, BaseModel): async def run_crew() -> None: try: self.stream = False - inner_result = await self.akickoff(inputs) + inner_result = await self.akickoff(inputs, input_files) if isinstance(inner_result, CrewOutput): ctx.result_holder.append(inner_result) except Exception as exc: @@ -861,7 +906,7 @@ class Crew(FlowTrackable, BaseModel): token = attach(baggage_ctx) try: - inputs = prepare_kickoff(self, inputs) + inputs = prepare_kickoff(self, inputs, input_files) if self.process == Process.sequential: result = await self._arun_sequential_process() @@ -889,11 +934,21 @@ class Crew(FlowTrackable, BaseModel): detach(token) async def akickoff_for_each( - self, inputs: list[dict[str, Any]] + self, + inputs: list[dict[str, Any]], + input_files: dict[str, Any] | None = None, ) -> list[CrewOutput | CrewStreamingOutput] | CrewStreamingOutput: """Native async execution of the Crew's workflow for each input. Uses native async throughout rather than thread-based async. + + Args: + inputs: List of input dictionaries, one per execution. + input_files: Optional dict of named file inputs shared across all executions. + + Returns: + List of CrewOutput or CrewStreamingOutput objects. + If stream=True, returns a single CrewStreamingOutput that yields chunks from all crews as they arrive. """ @@ -901,7 +956,7 @@ class Crew(FlowTrackable, BaseModel): async def kickoff_fn( crew: Crew, input_data: dict[str, Any] ) -> CrewOutput | CrewStreamingOutput: - return await crew.akickoff(inputs=input_data) + return await crew.akickoff(inputs=input_data, input_files=input_files) return await run_for_each_async(self, inputs, kickoff_fn) diff --git a/lib/crewai/src/crewai/crews/utils.py b/lib/crewai/src/crewai/crews/utils.py index 51ef316c5..e534d9dca 100644 --- a/lib/crewai/src/crewai/crews/utils.py +++ b/lib/crewai/src/crewai/crews/utils.py @@ -6,6 +6,8 @@ import asyncio from collections.abc import Callable, Coroutine, Iterable, Mapping from typing import TYPE_CHECKING, Any +from opentelemetry import baggage + from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.crews.crew_output import CrewOutput from crewai.rag.embeddings.types import EmbedderConfig @@ -16,7 +18,6 @@ from crewai.utilities.streaming import ( TaskInfo, create_streaming_state, ) -from crewai.utilities.types import KickoffInputs try: @@ -222,7 +223,9 @@ def _extract_files_from_inputs(inputs: dict[str, Any]) -> dict[str, Any]: def prepare_kickoff( - crew: Crew, inputs: KickoffInputs | dict[str, Any] | None + crew: Crew, + inputs: dict[str, Any] | None, + input_files: dict[str, Any] | None = None, ) -> dict[str, Any] | None: """Prepare crew for kickoff execution. @@ -232,6 +235,7 @@ def prepare_kickoff( Args: crew: The crew instance to prepare. inputs: Optional input dictionary to pass to the crew. + input_files: Optional dict of named file inputs for the crew. Returns: The potentially modified inputs dictionary after before callbacks. @@ -272,20 +276,26 @@ def prepare_kickoff( crew._task_output_handler.reset() crew._logging_color = "bold_purple" - if normalized is not None: - # Extract files from dedicated "files" key - files = normalized.pop("files", None) or {} + # Check for flow input files in baggage context (inherited from parent Flow) + _flow_files = baggage.get_baggage("flow_input_files") + flow_files: dict[str, Any] = _flow_files if isinstance(_flow_files, dict) else {} + if normalized is not None: # Extract file objects unpacked directly into inputs unpacked_files = _extract_files_from_inputs(normalized) - # Merge files (unpacked files take precedence over explicit files dict) - all_files = {**files, **unpacked_files} + # Merge files: flow_files < input_files < unpacked_files (later takes precedence) + all_files = {**flow_files, **(input_files or {}), **unpacked_files} if all_files: store_files(crew.id, all_files) crew._inputs = normalized crew._interpolate_inputs(normalized) + else: + # No inputs dict provided + all_files = {**flow_files, **(input_files or {})} + if all_files: + store_files(crew.id, all_files) crew._set_tasks_callbacks() crew._set_allow_crewai_trigger_context_for_first_task() diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index f1ad160cf..b9b7c5b54 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1412,13 +1412,21 @@ class Flow(Generic[T], metaclass=FlowMeta): object.__setattr__(self._state, key, value) def kickoff( - self, inputs: dict[str, Any] | None = None + self, + inputs: dict[str, Any] | None = None, + input_files: dict[str, Any] | None = None, ) -> Any | FlowStreamingOutput: - """ - Start the flow execution in a synchronous context. + """Start the flow execution in a synchronous context. This method wraps kickoff_async so that all state initialization and event emission is handled in the asynchronous method. + + Args: + inputs: Optional dictionary containing input values and/or a state ID. + input_files: Optional dict of named file inputs for the flow. + + Returns: + The final output from the flow or FlowStreamingOutput if streaming. """ if self.stream: result_holder: list[Any] = [] @@ -1438,7 +1446,7 @@ class Flow(Generic[T], metaclass=FlowMeta): def run_flow() -> None: try: self.stream = False - result = self.kickoff(inputs=inputs) + result = self.kickoff(inputs=inputs, input_files=input_files) result_holder.append(result) except Exception as e: # HumanFeedbackPending is expected control flow, not an error @@ -1460,15 +1468,16 @@ class Flow(Generic[T], metaclass=FlowMeta): return streaming_output async def _run_flow() -> Any: - return await self.kickoff_async(inputs) + return await self.kickoff_async(inputs, input_files) return asyncio.run(_run_flow()) async def kickoff_async( - self, inputs: dict[str, Any] | None = None + self, + inputs: dict[str, Any] | None = None, + input_files: dict[str, Any] | None = None, ) -> Any | FlowStreamingOutput: - """ - Start the flow execution asynchronously. + """Start the flow execution asynchronously. This method performs state restoration (if an 'id' is provided and persistence is available) and updates the flow state with any additional inputs. It then emits the FlowStartedEvent, @@ -1477,6 +1486,7 @@ class Flow(Generic[T], metaclass=FlowMeta): Args: inputs: Optional dictionary containing input values and/or a state ID for restoration. + input_files: Optional dict of named file inputs for the flow. Returns: The final output from the flow, which is the result of the last executed method. @@ -1499,7 +1509,9 @@ class Flow(Generic[T], metaclass=FlowMeta): async def run_flow() -> None: try: self.stream = False - result = await self.kickoff_async(inputs=inputs) + result = await self.kickoff_async( + inputs=inputs, input_files=input_files + ) result_holder.append(result) except Exception as e: # HumanFeedbackPending is expected control flow, not an error @@ -1523,6 +1535,7 @@ class Flow(Generic[T], metaclass=FlowMeta): return streaming_output ctx = baggage.set_baggage("flow_inputs", inputs or {}) + ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx) flow_token = attach(ctx) try: @@ -1705,18 +1718,20 @@ class Flow(Generic[T], metaclass=FlowMeta): detach(flow_token) async def akickoff( - self, inputs: dict[str, Any] | None = None + self, + inputs: dict[str, Any] | None = None, + input_files: dict[str, Any] | None = None, ) -> Any | FlowStreamingOutput: """Native async method to start the flow execution. Alias for kickoff_async. - Args: inputs: Optional dictionary containing input values and/or a state ID for restoration. + input_files: Optional dict of named file inputs for the flow. Returns: The final output from the flow, which is the result of the last executed method. """ - return await self.kickoff_async(inputs) + return await self.kickoff_async(inputs, input_files) async def _execute_start_method(self, start_method_name: FlowMethodName) -> None: """Executes a flow's start method and its triggered listeners. diff --git a/lib/crewai/src/crewai/lite_agent.py b/lib/crewai/src/crewai/lite_agent.py index b59f230a7..0cff9b6fe 100644 --- a/lib/crewai/src/crewai/lite_agent.py +++ b/lib/crewai/src/crewai/lite_agent.py @@ -296,9 +296,9 @@ class LiteAgent(FlowTrackable, BaseModel): self, messages: str | list[LLMMessage], response_format: type[BaseModel] | None = None, + input_files: dict[str, Any] | None = None, ) -> LiteAgentOutput: - """ - Execute the agent with the given messages. + """Execute the agent with the given messages. Args: messages: Either a string query or a list of message dictionaries. @@ -306,6 +306,8 @@ class LiteAgent(FlowTrackable, BaseModel): If a list is provided, each dict should have 'role' and 'content' keys. response_format: Optional Pydantic model for structured output. If provided, overrides self.response_format for this execution. + input_files: Optional dict of named files to attach to the message. + Files can be paths, bytes, or File objects from crewai_files. Returns: LiteAgentOutput: The result of the agent execution. @@ -327,7 +329,7 @@ class LiteAgent(FlowTrackable, BaseModel): # Format messages for the LLM self._messages = self._format_messages( - messages, response_format=response_format + messages, response_format=response_format, input_files=input_files ) return self._execute_core( @@ -464,19 +466,45 @@ class LiteAgent(FlowTrackable, BaseModel): return output - async def kickoff_async(self, messages: str | list[LLMMessage]) -> LiteAgentOutput: - """ - Execute the agent asynchronously with the given messages. + async def kickoff_async( + self, + messages: str | list[LLMMessage], + response_format: type[BaseModel] | None = None, + input_files: dict[str, Any] | None = None, + ) -> LiteAgentOutput: + """Execute the agent asynchronously with the given messages. Args: messages: Either a string query or a list of message dictionaries. If a string is provided, it will be converted to a user message. If a list is provided, each dict should have 'role' and 'content' keys. + response_format: Optional Pydantic model for structured output. + input_files: Optional dict of named files to attach to the message. Returns: LiteAgentOutput: The result of the agent execution. """ - return await asyncio.to_thread(self.kickoff, messages) + return await asyncio.to_thread( + self.kickoff, messages, response_format, input_files + ) + + async def akickoff( + self, + messages: str | list[LLMMessage], + response_format: type[BaseModel] | None = None, + input_files: dict[str, Any] | None = None, + ) -> LiteAgentOutput: + """Async version of kickoff. Alias for kickoff_async. + + Args: + messages: Either a string query or a list of message dictionaries. + response_format: Optional Pydantic model for structured output. + input_files: Optional dict of named files to attach to the message. + + Returns: + LiteAgentOutput: The result of the agent execution. + """ + return await self.kickoff_async(messages, response_format, input_files) def _get_default_system_prompt( self, response_format: type[BaseModel] | None = None @@ -520,12 +548,14 @@ class LiteAgent(FlowTrackable, BaseModel): self, messages: str | list[LLMMessage], response_format: type[BaseModel] | None = None, + input_files: dict[str, Any] | None = None, ) -> list[LLMMessage]: """Format messages for the LLM. Args: - messages: Input messages to format - response_format: Optional response format to use instead of self.response_format + messages: Input messages to format. + response_format: Optional response format to use instead of self.response_format. + input_files: Optional dict of named files to include with the messages. """ if isinstance(messages, str): messages = [{"role": "user", "content": messages}] @@ -540,6 +570,13 @@ class LiteAgent(FlowTrackable, BaseModel): # Add the rest of the messages formatted_messages.extend(messages) + # Attach files to the last user message if provided + if input_files: + for msg in reversed(formatted_messages): + if msg.get("role") == "user": + msg["files"] = input_files + break + return formatted_messages def _invoke_loop(self) -> AgentFinish: diff --git a/lib/crewai/src/crewai/utilities/types.py b/lib/crewai/src/crewai/utilities/types.py index 84a774df8..98ff0877b 100644 --- a/lib/crewai/src/crewai/utilities/types.py +++ b/lib/crewai/src/crewai/utilities/types.py @@ -27,13 +27,3 @@ class LLMMessage(TypedDict): name: NotRequired[str] tool_calls: NotRequired[list[dict[str, Any]]] files: NotRequired[dict[str, FileInput]] - - -class KickoffInputs(TypedDict, total=False): - """Type for crew kickoff inputs. - - Attributes: - files: Named file inputs accessible to tasks during execution. - """ - - files: dict[str, FileInput]