From 8cf483712b69b28440e5a02fed65d537ec0bebf9 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Thu, 29 Jan 2026 15:58:12 -0500 Subject: [PATCH] feat: add file input support to a2a delegation and tasks Introduces handling of file inputs in A2A delegation flows by converting file dictionaries to protocol-compatible parts and propagating them through delegation and task execution functions. Updates include utility functions for file conversion, changes to message construction, and passing input_files through relevant APIs. --- lib/crewai/src/crewai/a2a/utils/delegation.py | 47 ++++++++++++++++++- lib/crewai/src/crewai/a2a/utils/task.py | 37 +++++++++++++++ lib/crewai/src/crewai/a2a/wrapper.py | 2 + lib/crewai/src/crewai/lite_agent.py | 1 + 4 files changed, 86 insertions(+), 1 deletion(-) diff --git a/lib/crewai/src/crewai/a2a/utils/delegation.py b/lib/crewai/src/crewai/a2a/utils/delegation.py index b2315c13f..cfcf51f36 100644 --- a/lib/crewai/src/crewai/a2a/utils/delegation.py +++ b/lib/crewai/src/crewai/a2a/utils/delegation.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import base64 from collections.abc import AsyncIterator, Callable, MutableMapping from contextlib import asynccontextmanager import logging @@ -12,6 +13,8 @@ import uuid from a2a.client import Client, ClientConfig, ClientFactory from a2a.types import ( AgentCard, + FilePart, + FileWithBytes, Message, Part, PushNotificationConfig as A2APushNotificationConfig, @@ -81,6 +84,38 @@ if TYPE_CHECKING: _DEFAULT_TRANSPORT: Final[TransportType] = "JSONRPC" +def _create_file_parts(input_files: dict[str, Any] | None) -> list[Part]: + """Convert FileInput dictionary to FilePart objects. + + Args: + input_files: Dictionary mapping names to FileInput objects. + + Returns: + List of Part objects containing FilePart data. + """ + if not input_files: + return [] + + try: + import crewai_files # noqa: F401 + except ImportError: + logger.debug("crewai_files not installed, skipping file parts") + return [] + + parts: list[Part] = [] + for name, file_input in input_files.items(): + content_bytes = file_input.read() + content_base64 = base64.b64encode(content_bytes).decode() + file_with_bytes = FileWithBytes( + bytes=content_base64, + mimeType=file_input.content_type, + name=file_input.filename or name, + ) + parts.append(Part(root=FilePart(file=file_with_bytes))) + + return parts + + def get_handler(config: UpdateConfig | None) -> HandlerType: """Get the handler class for a given update config. @@ -119,6 +154,7 @@ def execute_a2a_delegation( client_extensions: list[str] | None = None, transport: ClientTransportConfig | None = None, accepted_output_modes: list[str] | None = None, + input_files: dict[str, Any] | None = None, ) -> TaskStateResult: """Execute a task delegation to a remote A2A agent synchronously. @@ -154,6 +190,7 @@ def execute_a2a_delegation( client_extensions: A2A protocol extension URIs the client supports. transport: Transport configuration (preferred, supported transports, gRPC settings). accepted_output_modes: MIME types the client can accept in responses. + input_files: Optional dictionary of files to send to remote agent. Returns: TaskStateResult with status, result/error, history, and agent_card. @@ -199,6 +236,7 @@ def execute_a2a_delegation( client_extensions=client_extensions, transport=transport, accepted_output_modes=accepted_output_modes, + input_files=input_files, ) ) finally: @@ -232,6 +270,7 @@ async def aexecute_a2a_delegation( client_extensions: list[str] | None = None, transport: ClientTransportConfig | None = None, accepted_output_modes: list[str] | None = None, + input_files: dict[str, Any] | None = None, ) -> TaskStateResult: """Execute a task delegation to a remote A2A agent asynchronously. @@ -262,6 +301,7 @@ async def aexecute_a2a_delegation( client_extensions: A2A protocol extension URIs the client supports. transport: Transport configuration (preferred, supported transports, gRPC settings). accepted_output_modes: MIME types the client can accept in responses. + input_files: Optional dictionary of files to send to remote agent. Returns: TaskStateResult with status, result/error, history, and agent_card. @@ -299,6 +339,7 @@ async def aexecute_a2a_delegation( client_extensions=client_extensions, transport=transport, accepted_output_modes=accepted_output_modes, + input_files=input_files, ) except Exception as e: crewai_event_bus.emit( @@ -366,6 +407,7 @@ async def _aexecute_a2a_delegation_impl( client_extensions: list[str] | None = None, transport: ClientTransportConfig | None = None, accepted_output_modes: list[str] | None = None, + input_files: dict[str, Any] | None = None, ) -> TaskStateResult: """Internal async implementation of A2A delegation.""" if transport is None: @@ -517,10 +559,13 @@ async def _aexecute_a2a_delegation_impl( if skill_id: message_metadata["skill_id"] = skill_id + parts_list: list[Part] = [Part(root=TextPart(**parts))] + parts_list.extend(_create_file_parts(input_files)) + message = Message( role=Role.user, message_id=str(uuid.uuid4()), - parts=[Part(root=TextPart(**parts))], + parts=parts_list, context_id=context_id, task_id=task_id, reference_task_ids=reference_task_ids, diff --git a/lib/crewai/src/crewai/a2a/utils/task.py b/lib/crewai/src/crewai/a2a/utils/task.py index 63868b841..d73556875 100644 --- a/lib/crewai/src/crewai/a2a/utils/task.py +++ b/lib/crewai/src/crewai/a2a/utils/task.py @@ -17,6 +17,8 @@ from a2a.server.agent_execution import RequestContext from a2a.server.events import EventQueue from a2a.types import ( Artifact, + FileWithBytes, + FileWithUri, InternalError, InvalidParamsError, Message, @@ -28,6 +30,7 @@ from a2a.types import ( ) from a2a.utils import ( get_data_parts, + get_file_parts, new_agent_text_message, new_data_artifact, new_text_artifact, @@ -191,6 +194,37 @@ def cancellable( return wrapper +def _convert_a2a_files_to_file_inputs( + a2a_files: list[FileWithBytes | FileWithUri], +) -> dict[str, Any]: + """Convert a2a file types to crewai FileInput dict. + + Args: + a2a_files: List of FileWithBytes or FileWithUri from a2a SDK. + + Returns: + Dictionary mapping file names to FileInput objects. + """ + try: + from crewai_files import File, FileBytes + except ImportError: + logger.debug("crewai_files not installed, returning empty file dict") + return {} + + file_dict: dict[str, Any] = {} + for idx, a2a_file in enumerate(a2a_files): + if isinstance(a2a_file, FileWithBytes): + file_bytes = base64.b64decode(a2a_file.bytes) + name = a2a_file.name or f"file_{idx}" + file_source = FileBytes(data=file_bytes, filename=a2a_file.name) + file_dict[name] = File(source=file_source) + elif isinstance(a2a_file, FileWithUri): + name = a2a_file.name or f"file_{idx}" + file_dict[name] = File(source=a2a_file.uri) + + return file_dict + + def _extract_response_schema(parts: list[Part]) -> dict[str, Any] | None: """Extract response schema from message parts metadata. @@ -295,6 +329,7 @@ async def _execute_impl( response_model: type[BaseModel] | None = None structured_inputs: list[dict[str, Any]] = [] + a2a_files: list[FileWithBytes | FileWithUri] = [] if context.message and context.message.parts: schema = _extract_response_schema(context.message.parts) @@ -308,6 +343,7 @@ async def _execute_impl( ) structured_inputs = get_data_parts(context.message.parts) + a2a_files = get_file_parts(context.message.parts) task_id = context.task_id context_id = context.context_id @@ -329,6 +365,7 @@ async def _execute_impl( expected_output="Response to the user's request", agent=agent, response_model=response_model, + input_files=_convert_a2a_files_to_file_inputs(a2a_files), ) crewai_event_bus.emit( diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index e77b9fb9e..0eb8c503b 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -977,6 +977,7 @@ def _delegate_to_a2a( from_agent=self, client_extensions=getattr(ctx.agent_config, "extensions", None), accepted_output_modes=accepted_output_modes, + input_files=task.input_files, ) conversation_history = a2a_result.get("history", []) @@ -1320,6 +1321,7 @@ async def _adelegate_to_a2a( from_agent=self, client_extensions=getattr(ctx.agent_config, "extensions", None), accepted_output_modes=accepted_output_modes, + input_files=task.input_files, ) conversation_history = a2a_result.get("history", []) diff --git a/lib/crewai/src/crewai/lite_agent.py b/lib/crewai/src/crewai/lite_agent.py index f91d6d779..ba66dded9 100644 --- a/lib/crewai/src/crewai/lite_agent.py +++ b/lib/crewai/src/crewai/lite_agent.py @@ -134,6 +134,7 @@ def _kickoff_with_a2a_support( description=description, agent=agent, expected_output="Result from A2A delegation", + input_files=input_files or {}, ) def task_to_kickoff_adapter(