feat: add file input support to a2a delegation and tasks

Introduces handling of file inputs in A2A delegation flows by converting file dictionaries to protocol-compatible parts and propagating them through delegation and task execution functions. Updates include utility functions for file conversion, changes to message construction, and passing input_files through relevant APIs.
This commit is contained in:
Greyson LaLonde
2026-01-29 15:58:12 -05:00
parent 6da4b13702
commit 8cf483712b
4 changed files with 86 additions and 1 deletions

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import asyncio
import base64
from collections.abc import AsyncIterator, Callable, MutableMapping
from contextlib import asynccontextmanager
import logging
@@ -12,6 +13,8 @@ import uuid
from a2a.client import Client, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
FilePart,
FileWithBytes,
Message,
Part,
PushNotificationConfig as A2APushNotificationConfig,
@@ -81,6 +84,38 @@ if TYPE_CHECKING:
_DEFAULT_TRANSPORT: Final[TransportType] = "JSONRPC"
def _create_file_parts(input_files: dict[str, Any] | None) -> list[Part]:
"""Convert FileInput dictionary to FilePart objects.
Args:
input_files: Dictionary mapping names to FileInput objects.
Returns:
List of Part objects containing FilePart data.
"""
if not input_files:
return []
try:
import crewai_files # noqa: F401
except ImportError:
logger.debug("crewai_files not installed, skipping file parts")
return []
parts: list[Part] = []
for name, file_input in input_files.items():
content_bytes = file_input.read()
content_base64 = base64.b64encode(content_bytes).decode()
file_with_bytes = FileWithBytes(
bytes=content_base64,
mimeType=file_input.content_type,
name=file_input.filename or name,
)
parts.append(Part(root=FilePart(file=file_with_bytes)))
return parts
def get_handler(config: UpdateConfig | None) -> HandlerType:
"""Get the handler class for a given update config.
@@ -119,6 +154,7 @@ def execute_a2a_delegation(
client_extensions: list[str] | None = None,
transport: ClientTransportConfig | None = None,
accepted_output_modes: list[str] | None = None,
input_files: dict[str, Any] | None = None,
) -> TaskStateResult:
"""Execute a task delegation to a remote A2A agent synchronously.
@@ -154,6 +190,7 @@ def execute_a2a_delegation(
client_extensions: A2A protocol extension URIs the client supports.
transport: Transport configuration (preferred, supported transports, gRPC settings).
accepted_output_modes: MIME types the client can accept in responses.
input_files: Optional dictionary of files to send to remote agent.
Returns:
TaskStateResult with status, result/error, history, and agent_card.
@@ -199,6 +236,7 @@ def execute_a2a_delegation(
client_extensions=client_extensions,
transport=transport,
accepted_output_modes=accepted_output_modes,
input_files=input_files,
)
)
finally:
@@ -232,6 +270,7 @@ async def aexecute_a2a_delegation(
client_extensions: list[str] | None = None,
transport: ClientTransportConfig | None = None,
accepted_output_modes: list[str] | None = None,
input_files: dict[str, Any] | None = None,
) -> TaskStateResult:
"""Execute a task delegation to a remote A2A agent asynchronously.
@@ -262,6 +301,7 @@ async def aexecute_a2a_delegation(
client_extensions: A2A protocol extension URIs the client supports.
transport: Transport configuration (preferred, supported transports, gRPC settings).
accepted_output_modes: MIME types the client can accept in responses.
input_files: Optional dictionary of files to send to remote agent.
Returns:
TaskStateResult with status, result/error, history, and agent_card.
@@ -299,6 +339,7 @@ async def aexecute_a2a_delegation(
client_extensions=client_extensions,
transport=transport,
accepted_output_modes=accepted_output_modes,
input_files=input_files,
)
except Exception as e:
crewai_event_bus.emit(
@@ -366,6 +407,7 @@ async def _aexecute_a2a_delegation_impl(
client_extensions: list[str] | None = None,
transport: ClientTransportConfig | None = None,
accepted_output_modes: list[str] | None = None,
input_files: dict[str, Any] | None = None,
) -> TaskStateResult:
"""Internal async implementation of A2A delegation."""
if transport is None:
@@ -517,10 +559,13 @@ async def _aexecute_a2a_delegation_impl(
if skill_id:
message_metadata["skill_id"] = skill_id
parts_list: list[Part] = [Part(root=TextPart(**parts))]
parts_list.extend(_create_file_parts(input_files))
message = Message(
role=Role.user,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(**parts))],
parts=parts_list,
context_id=context_id,
task_id=task_id,
reference_task_ids=reference_task_ids,

View File

@@ -17,6 +17,8 @@ from a2a.server.agent_execution import RequestContext
from a2a.server.events import EventQueue
from a2a.types import (
Artifact,
FileWithBytes,
FileWithUri,
InternalError,
InvalidParamsError,
Message,
@@ -28,6 +30,7 @@ from a2a.types import (
)
from a2a.utils import (
get_data_parts,
get_file_parts,
new_agent_text_message,
new_data_artifact,
new_text_artifact,
@@ -191,6 +194,37 @@ def cancellable(
return wrapper
def _convert_a2a_files_to_file_inputs(
a2a_files: list[FileWithBytes | FileWithUri],
) -> dict[str, Any]:
"""Convert a2a file types to crewai FileInput dict.
Args:
a2a_files: List of FileWithBytes or FileWithUri from a2a SDK.
Returns:
Dictionary mapping file names to FileInput objects.
"""
try:
from crewai_files import File, FileBytes
except ImportError:
logger.debug("crewai_files not installed, returning empty file dict")
return {}
file_dict: dict[str, Any] = {}
for idx, a2a_file in enumerate(a2a_files):
if isinstance(a2a_file, FileWithBytes):
file_bytes = base64.b64decode(a2a_file.bytes)
name = a2a_file.name or f"file_{idx}"
file_source = FileBytes(data=file_bytes, filename=a2a_file.name)
file_dict[name] = File(source=file_source)
elif isinstance(a2a_file, FileWithUri):
name = a2a_file.name or f"file_{idx}"
file_dict[name] = File(source=a2a_file.uri)
return file_dict
def _extract_response_schema(parts: list[Part]) -> dict[str, Any] | None:
"""Extract response schema from message parts metadata.
@@ -295,6 +329,7 @@ async def _execute_impl(
response_model: type[BaseModel] | None = None
structured_inputs: list[dict[str, Any]] = []
a2a_files: list[FileWithBytes | FileWithUri] = []
if context.message and context.message.parts:
schema = _extract_response_schema(context.message.parts)
@@ -308,6 +343,7 @@ async def _execute_impl(
)
structured_inputs = get_data_parts(context.message.parts)
a2a_files = get_file_parts(context.message.parts)
task_id = context.task_id
context_id = context.context_id
@@ -329,6 +365,7 @@ async def _execute_impl(
expected_output="Response to the user's request",
agent=agent,
response_model=response_model,
input_files=_convert_a2a_files_to_file_inputs(a2a_files),
)
crewai_event_bus.emit(

View File

@@ -977,6 +977,7 @@ def _delegate_to_a2a(
from_agent=self,
client_extensions=getattr(ctx.agent_config, "extensions", None),
accepted_output_modes=accepted_output_modes,
input_files=task.input_files,
)
conversation_history = a2a_result.get("history", [])
@@ -1320,6 +1321,7 @@ async def _adelegate_to_a2a(
from_agent=self,
client_extensions=getattr(ctx.agent_config, "extensions", None),
accepted_output_modes=accepted_output_modes,
input_files=task.input_files,
)
conversation_history = a2a_result.get("history", [])

View File

@@ -134,6 +134,7 @@ def _kickoff_with_a2a_support(
description=description,
agent=agent,
expected_output="Result from A2A delegation",
input_files=input_files or {},
)
def task_to_kickoff_adapter(