From 174c61bd736b693a5785e597838bc9c94424e213 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Mon, 5 Jan 2026 22:05:44 -0500 Subject: [PATCH] feat: add push notification protocol and config --- lib/crewai/src/crewai/a2a/updates/__init__.py | 2 + lib/crewai/src/crewai/a2a/updates/base.py | 66 ++++++++++++++++++- .../a2a/updates/push_notifications/config.py | 13 ++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/lib/crewai/src/crewai/a2a/updates/__init__.py b/lib/crewai/src/crewai/a2a/updates/__init__.py index 79b7f35b4..953eb48c3 100644 --- a/lib/crewai/src/crewai/a2a/updates/__init__.py +++ b/lib/crewai/src/crewai/a2a/updates/__init__.py @@ -4,6 +4,7 @@ from crewai.a2a.updates.base import ( BaseHandlerKwargs, PollingHandlerKwargs, PushNotificationHandlerKwargs, + PushNotificationResultStore, StreamingHandlerKwargs, UpdateHandler, ) @@ -25,6 +26,7 @@ __all__ = [ "PushNotificationConfig", "PushNotificationHandler", "PushNotificationHandlerKwargs", + "PushNotificationResultStore", "StreamingConfig", "StreamingHandler", "StreamingHandlerKwargs", diff --git a/lib/crewai/src/crewai/a2a/updates/base.py b/lib/crewai/src/crewai/a2a/updates/base.py index 34016f955..74060a1d3 100644 --- a/lib/crewai/src/crewai/a2a/updates/base.py +++ b/lib/crewai/src/crewai/a2a/updates/base.py @@ -4,12 +4,16 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any, Protocol, TypedDict +from pydantic import GetCoreSchemaHandler +from pydantic_core import CoreSchema, core_schema + if TYPE_CHECKING: from a2a.client import Client - from a2a.types import AgentCard, Message + from a2a.types import AgentCard, Message, Task from crewai.a2a.task_helpers import TaskStateResult + from crewai.a2a.updates.push_notifications.config import PushNotificationConfig class BaseHandlerKwargs(TypedDict, total=False): @@ -41,6 +45,66 @@ class StreamingHandlerKwargs(BaseHandlerKwargs, total=False): class PushNotificationHandlerKwargs(BaseHandlerKwargs, total=False): """Kwargs for push notification handler.""" + config: PushNotificationConfig + result_store: PushNotificationResultStore + polling_timeout: float + polling_interval: float + agent_branch: Any + + +class PushNotificationResultStore(Protocol): + """Protocol for storing and retrieving push notification results. + + This protocol defines the interface for a result store that the + PushNotificationHandler uses to wait for task completion. Enterprise + implementations can use Redis, in-memory storage, or other backends. + """ + + @classmethod + def __get_pydantic_core_schema__( + cls, + source_type: Any, + handler: GetCoreSchemaHandler, + ) -> CoreSchema: + return core_schema.any_schema() + + async def wait_for_result( + self, + task_id: str, + timeout: float, + poll_interval: float = 1.0, + ) -> Task | None: + """Wait for a task result to be available. + + Args: + task_id: The task ID to wait for. + timeout: Max seconds to wait before returning None. + poll_interval: Seconds between polling attempts. + + Returns: + The completed Task object, or None if timeout. + """ + ... + + async def get_result(self, task_id: str) -> Task | None: + """Get a task result if available. + + Args: + task_id: The task ID to retrieve. + + Returns: + The Task object if available, None otherwise. + """ + ... + + async def store_result(self, task: Task) -> None: + """Store a task result. + + Args: + task: The Task object to store. + """ + ... + class UpdateHandler(Protocol): """Protocol for A2A update mechanism handlers.""" diff --git a/lib/crewai/src/crewai/a2a/updates/push_notifications/config.py b/lib/crewai/src/crewai/a2a/updates/push_notifications/config.py index 03b2c6856..b37ed67d3 100644 --- a/lib/crewai/src/crewai/a2a/updates/push_notifications/config.py +++ b/lib/crewai/src/crewai/a2a/updates/push_notifications/config.py @@ -5,6 +5,7 @@ from __future__ import annotations from pydantic import AnyHttpUrl, BaseModel, Field from crewai.a2a.auth.schemas import AuthScheme +from crewai.a2a.updates.base import PushNotificationResultStore class PushNotificationConfig(BaseModel): @@ -15,6 +16,9 @@ class PushNotificationConfig(BaseModel): id: Unique identifier for this config. token: Token to validate incoming notifications. authentication: Auth scheme for the callback endpoint. + timeout: Max seconds to wait for task completion. + interval: Seconds between result polling attempts. + result_store: Store for receiving push notification results. """ url: AnyHttpUrl = Field(description="Callback URL for push notifications") @@ -23,3 +27,12 @@ class PushNotificationConfig(BaseModel): authentication: AuthScheme | None = Field( default=None, description="Authentication for callback endpoint" ) + timeout: float | None = Field( + default=300.0, gt=0, description="Max seconds to wait for task completion" + ) + interval: float = Field( + default=2.0, gt=0, description="Seconds between result polling attempts" + ) + result_store: PushNotificationResultStore | None = Field( + default=None, description="Result store for push notification handling" + )