fix: pass push notification config in initial request

This commit is contained in:
Greyson LaLonde
2026-01-06 17:20:33 -05:00
parent d6c7f33744
commit 5629a707ae
4 changed files with 51 additions and 34 deletions

View File

@@ -27,3 +27,14 @@ UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
" $unavailable_agents"
"\n</A2A_AGENTS_STATUS>\n"
)
REMOTE_AGENT_COMPLETED_NOTICE: Final[str] = """
<REMOTE_AGENT_STATUS>
STATUS: COMPLETED
The remote agent has finished processing your request. Their response is in the conversation history above.
You MUST now:
1. Extract the answer from the conversation history
2. Set is_a2a=false
3. Return the answer as your final message
DO NOT send another request - the task is already done.
</REMOTE_AGENT_STATUS>
"""

View File

@@ -9,8 +9,6 @@ from a2a.client import Client
from a2a.types import (
AgentCard,
Message,
PushNotificationConfig as A2APushNotificationConfig,
TaskPushNotificationConfig,
TaskState,
)
from typing_extensions import Unpack
@@ -34,29 +32,10 @@ from crewai.events.types.a2a_events import (
if TYPE_CHECKING:
from a2a.types import Task as A2ATask
from crewai.a2a.updates.push_notifications.config import PushNotificationConfig
logger = logging.getLogger(__name__)
def _build_a2a_push_config(config: PushNotificationConfig) -> A2APushNotificationConfig:
"""Convert our config to A2A SDK's PushNotificationConfig.
Args:
config: Our PushNotificationConfig.
Returns:
A2A SDK PushNotificationConfig.
"""
return A2APushNotificationConfig(
url=str(config.url),
id=config.id,
token=config.token,
authentication=config.authentication,
)
async def _wait_for_push_result(
task_id: str,
result_store: PushNotificationResultStore,
@@ -143,6 +122,10 @@ class PushNotificationHandler:
history=new_messages,
)
# Note: Push notification config is now included in the initial send_message
# request via ClientConfig.push_notification_configs, so no separate
# set_task_callback call is needed. This avoids race conditions where
# the task completes before the callback is registered.
result_or_task_id = await send_message_and_get_task_id(
event_stream=client.send_message(message),
new_messages=new_messages,
@@ -157,14 +140,6 @@ class PushNotificationHandler:
task_id = result_or_task_id
a2a_push_config = _build_a2a_push_config(config)
await client.set_task_callback(
TaskPushNotificationConfig(
task_id=task_id,
push_notification_config=a2a_push_config,
)
)
crewai_event_bus.emit(
agent_branch,
A2APushNotificationRegisteredEvent(
@@ -174,7 +149,7 @@ class PushNotificationHandler:
)
logger.debug(
"Registered push notification callback for task %s at %s",
"Push notification callback for task %s configured at %s (via initial request)",
task_id,
config.url,
)

View File

@@ -15,6 +15,7 @@ from a2a.types import (
AgentCard,
Message,
Part,
PushNotificationConfig as A2APushNotificationConfig,
Role,
TextPart,
TransportProtocol,
@@ -511,14 +512,21 @@ async def _execute_a2a_delegation_async(
}
)
push_config_for_client = (
updates if isinstance(updates, PushNotificationConfig) else None
)
use_streaming = not use_polling and push_config_for_client is None
async with _create_a2a_client(
agent_card=agent_card,
transport_protocol=transport_protocol,
timeout=timeout,
headers=headers,
streaming=not use_polling,
streaming=use_streaming,
auth=auth,
use_polling=use_polling,
push_notification_config=push_config_for_client,
) as client:
return await handler.execute(
client=client,
@@ -538,6 +546,7 @@ async def _create_a2a_client(
streaming: bool,
auth: AuthScheme | None = None,
use_polling: bool = False,
push_notification_config: PushNotificationConfig | None = None,
) -> AsyncIterator[Client]:
"""Create and configure an A2A client.
@@ -549,6 +558,7 @@ async def _create_a2a_client(
streaming: Enable streaming responses
auth: Optional AuthScheme for client configuration
use_polling: Enable polling mode
push_notification_config: Optional push notification config to include in requests
Yields:
Configured A2A client instance
@@ -561,12 +571,24 @@ async def _create_a2a_client(
if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, httpx_client)
push_configs: list[A2APushNotificationConfig] = []
if push_notification_config is not None:
push_configs.append(
A2APushNotificationConfig(
url=str(push_notification_config.url),
id=push_notification_config.id,
token=push_notification_config.token,
authentication=push_notification_config.authentication,
)
)
config = ClientConfig(
httpx_client=httpx_client,
supported_transports=[str(transport_protocol.value)],
streaming=streaming and not use_polling,
polling=use_polling,
accepted_output_modes=["application/json"],
push_notification_configs=push_configs,
)
factory = ClientFactory(config)
@@ -605,7 +627,7 @@ def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]:
is_a2a=(
bool,
Field(
description="Set to true to continue the conversation by sending this message to the A2A agent and awaiting their response. Set to false ONLY when you are completely done and providing your final answer (not when asking questions)."
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
),
),
__base__=BaseModel,

View File

@@ -21,6 +21,7 @@ from crewai.a2a.templates import (
AVAILABLE_AGENTS_TEMPLATE,
CONVERSATION_TURN_INFO_TEMPLATE,
PREVIOUS_A2A_CONVERSATION_TEMPLATE,
REMOTE_AGENT_COMPLETED_NOTICE,
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE,
)
from crewai.a2a.types import AgentResponseProtocol
@@ -256,6 +257,7 @@ def _augment_prompt_with_a2a(
max_turns: int | None = None,
failed_agents: dict[str, str] | None = None,
extension_registry: ExtensionRegistry | None = None,
remote_task_completed: bool = False,
) -> tuple[str, bool]:
"""Add A2A delegation instructions to prompt.
@@ -328,12 +330,15 @@ def _augment_prompt_with_a2a(
warning=warning,
)
completion_notice = ""
if remote_task_completed and conversation_history:
completion_notice = REMOTE_AGENT_COMPLETED_NOTICE
augmented_prompt = f"""{task_description}
IMPORTANT: You have the ability to delegate this task to remote A2A agents.
{agents_text}
{history_text}{turn_info}
{history_text}{turn_info}{completion_notice}
"""
@@ -383,6 +388,7 @@ def _handle_agent_response_and_continue(
context: str | None,
tools: list[BaseTool] | None,
agent_response_model: type[BaseModel],
remote_task_completed: bool = False,
) -> tuple[str | None, str | None]:
"""Handle A2A result and get CrewAI agent's response.
@@ -418,6 +424,7 @@ def _handle_agent_response_and_continue(
turn_num=turn_num,
max_turns=max_turns,
agent_cards=agent_cards_dict,
remote_task_completed=remote_task_completed,
)
original_response_model = task.response_model
@@ -625,6 +632,7 @@ def _delegate_to_a2a(
context=context,
tools=tools,
agent_response_model=agent_response_model,
remote_task_completed=(a2a_result["status"] == TaskState.completed),
)
if final_result is not None:
@@ -652,6 +660,7 @@ def _delegate_to_a2a(
context=context,
tools=tools,
agent_response_model=agent_response_model,
remote_task_completed=False,
)
if final_result is not None: