feat: add push notification protocol and config

This commit is contained in:
Greyson LaLonde
2026-01-05 22:05:44 -05:00
parent 8c089636e0
commit 174c61bd73
3 changed files with 80 additions and 1 deletions

View File

@@ -4,6 +4,7 @@ from crewai.a2a.updates.base import (
BaseHandlerKwargs,
PollingHandlerKwargs,
PushNotificationHandlerKwargs,
PushNotificationResultStore,
StreamingHandlerKwargs,
UpdateHandler,
)
@@ -25,6 +26,7 @@ __all__ = [
"PushNotificationConfig",
"PushNotificationHandler",
"PushNotificationHandlerKwargs",
"PushNotificationResultStore",
"StreamingConfig",
"StreamingHandler",
"StreamingHandlerKwargs",

View File

@@ -4,12 +4,16 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any, Protocol, TypedDict
from pydantic import GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema
if TYPE_CHECKING:
from a2a.client import Client
from a2a.types import AgentCard, Message
from a2a.types import AgentCard, Message, Task
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.updates.push_notifications.config import PushNotificationConfig
class BaseHandlerKwargs(TypedDict, total=False):
@@ -41,6 +45,66 @@ class StreamingHandlerKwargs(BaseHandlerKwargs, total=False):
class PushNotificationHandlerKwargs(BaseHandlerKwargs, total=False):
"""Kwargs for push notification handler."""
config: PushNotificationConfig
result_store: PushNotificationResultStore
polling_timeout: float
polling_interval: float
agent_branch: Any
class PushNotificationResultStore(Protocol):
"""Protocol for storing and retrieving push notification results.
This protocol defines the interface for a result store that the
PushNotificationHandler uses to wait for task completion. Enterprise
implementations can use Redis, in-memory storage, or other backends.
"""
@classmethod
def __get_pydantic_core_schema__(
cls,
source_type: Any,
handler: GetCoreSchemaHandler,
) -> CoreSchema:
return core_schema.any_schema()
async def wait_for_result(
self,
task_id: str,
timeout: float,
poll_interval: float = 1.0,
) -> Task | None:
"""Wait for a task result to be available.
Args:
task_id: The task ID to wait for.
timeout: Max seconds to wait before returning None.
poll_interval: Seconds between polling attempts.
Returns:
The completed Task object, or None if timeout.
"""
...
async def get_result(self, task_id: str) -> Task | None:
"""Get a task result if available.
Args:
task_id: The task ID to retrieve.
Returns:
The Task object if available, None otherwise.
"""
...
async def store_result(self, task: Task) -> None:
"""Store a task result.
Args:
task: The Task object to store.
"""
...
class UpdateHandler(Protocol):
"""Protocol for A2A update mechanism handlers."""

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from pydantic import AnyHttpUrl, BaseModel, Field
from crewai.a2a.auth.schemas import AuthScheme
from crewai.a2a.updates.base import PushNotificationResultStore
class PushNotificationConfig(BaseModel):
@@ -15,6 +16,9 @@ class PushNotificationConfig(BaseModel):
id: Unique identifier for this config.
token: Token to validate incoming notifications.
authentication: Auth scheme for the callback endpoint.
timeout: Max seconds to wait for task completion.
interval: Seconds between result polling attempts.
result_store: Store for receiving push notification results.
"""
url: AnyHttpUrl = Field(description="Callback URL for push notifications")
@@ -23,3 +27,12 @@ class PushNotificationConfig(BaseModel):
authentication: AuthScheme | None = Field(
default=None, description="Authentication for callback endpoint"
)
timeout: float | None = Field(
default=300.0, gt=0, description="Max seconds to wait for task completion"
)
interval: float = Field(
default=2.0, gt=0, description="Seconds between result polling attempts"
)
result_store: PushNotificationResultStore | None = Field(
default=None, description="Result store for push notification handling"
)