Merge branch 'main' into tm-account-for-thought-tokens-on-gemini

This commit is contained in:
Greyson LaLonde
2026-01-07 11:44:06 -05:00
committed by GitHub
29 changed files with 2289 additions and 311 deletions

View File

@@ -120,6 +120,8 @@ HEADERS_TO_FILTER = {
"accept-encoding": "ACCEPT-ENCODING-XXX",
"x-amzn-requestid": "X-AMZN-REQUESTID-XXX",
"x-amzn-RequestId": "X-AMZN-REQUESTID-XXX",
"x-a2a-notification-token": "X-A2A-NOTIFICATION-TOKEN-XXX",
"x-a2a-version": "X-A2A-VERSION-XXX",
}

View File

@@ -87,6 +87,10 @@ The `A2AConfig` class accepts the following parameters:
When `True`, returns the A2A agent's result directly when it signals completion. When `False`, allows the server agent to review the result and potentially continue the conversation.
</ParamField>
<ParamField path="updates" type="UpdateConfig" default="StreamingConfig()">
Update mechanism for receiving task status. Options: `StreamingConfig`, `PollingConfig`, or `PushNotificationConfig`.
</ParamField>
## Authentication
For A2A agents that require authentication, use one of the provided auth schemes:
@@ -253,6 +257,74 @@ When `fail_fast=False`:
- If all agents fail, the LLM receives a notice about unavailable agents and handles the task directly
- Connection errors are captured and included in the context for better decision-making
## Update Mechanisms
Control how your agent receives task status updates from remote A2A agents:
<Tabs>
<Tab title="Streaming (Default)">
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a.updates import StreamingConfig
agent = Agent(
role="Research Coordinator",
goal="Coordinate research tasks",
backstory="Expert at delegation",
llm="gpt-4o",
a2a=A2AConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
updates=StreamingConfig()
)
)
```
</Tab>
<Tab title="Polling">
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a.updates import PollingConfig
agent = Agent(
role="Research Coordinator",
goal="Coordinate research tasks",
backstory="Expert at delegation",
llm="gpt-4o",
a2a=A2AConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
updates=PollingConfig(
interval=2.0,
timeout=300.0,
max_polls=100
)
)
)
```
</Tab>
<Tab title="Push Notifications">
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a.updates import PushNotificationConfig
agent = Agent(
role="Research Coordinator",
goal="Coordinate research tasks",
backstory="Expert at delegation",
llm="gpt-4o",
a2a=A2AConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
updates=PushNotificationConfig(
url={base_url}/a2a/callback",
token="your-validation-token",
timeout=300.0
)
)
)
```
</Tab>
</Tabs>
## Best Practices
<CardGroup cols={2}>

View File

@@ -1,6 +1,18 @@
"""Agent-to-Agent (A2A) protocol communication module for CrewAI."""
from crewai.a2a.config import A2AConfig
from crewai.a2a.errors import A2APollingTimeoutError
from crewai.a2a.updates import (
PollingConfig,
PushNotificationConfig,
StreamingConfig,
)
__all__ = ["A2AConfig"]
__all__ = [
"A2AConfig",
"A2APollingTimeoutError",
"PollingConfig",
"PushNotificationConfig",
"StreamingConfig",
]

View File

@@ -5,17 +5,19 @@ This module is separate from experimental.a2a to avoid circular imports.
from __future__ import annotations
from typing import Annotated
from typing import Annotated, ClassVar
from pydantic import (
BaseModel,
BeforeValidator,
ConfigDict,
Field,
HttpUrl,
TypeAdapter,
)
from crewai.a2a.auth.schemas import AuthScheme
from crewai.a2a.updates import StreamingConfig, UpdateConfig
http_url_adapter = TypeAdapter(HttpUrl)
@@ -33,18 +35,21 @@ class A2AConfig(BaseModel):
Attributes:
endpoint: A2A agent endpoint URL.
auth: Authentication scheme (Bearer, OAuth2, API Key, HTTP Basic/Digest).
timeout: Request timeout in seconds (default: 120).
max_turns: Maximum conversation turns with A2A agent (default: 10).
auth: Authentication scheme.
timeout: Request timeout in seconds.
max_turns: Maximum conversation turns with A2A agent.
response_model: Optional Pydantic model for structured A2A agent responses.
fail_fast: If True, raise error when agent unreachable; if False, skip and continue (default: True).
trust_remote_completion_status: If True, return A2A agent's result directly when status is "completed"; if False, always ask server agent to respond (default: False).
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
updates: Update mechanism config.
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
endpoint: Url = Field(description="A2A agent endpoint URL")
auth: AuthScheme | None = Field(
default=None,
description="Authentication scheme (Bearer, OAuth2, API Key, HTTP Basic/Digest)",
description="Authentication scheme",
)
timeout: int = Field(default=120, description="Request timeout in seconds")
max_turns: int = Field(
@@ -52,13 +57,17 @@ class A2AConfig(BaseModel):
)
response_model: type[BaseModel] | None = Field(
default=None,
description="Optional Pydantic model for structured A2A agent responses. When specified, the A2A agent is expected to return JSON matching this schema.",
description="Optional Pydantic model for structured A2A agent responses",
)
fail_fast: bool = Field(
default=True,
description="If True, raise an error immediately when the A2A agent is unreachable. If False, skip the A2A agent and continue execution.",
description="If True, raise error when agent unreachable; if False, skip",
)
trust_remote_completion_status: bool = Field(
default=False,
description='If True, return the A2A agent\'s result directly when status is "completed" without asking the server agent to respond. If False, always ask the server agent to respond, allowing it to potentially delegate again.',
description="If True, return A2A result directly when completed",
)
updates: UpdateConfig = Field(
default_factory=StreamingConfig,
description="Update mechanism config",
)

View File

@@ -0,0 +1,7 @@
"""A2A protocol error types."""
from a2a.client.errors import A2AClientTimeoutError
class A2APollingTimeoutError(A2AClientTimeoutError):
"""Raised when polling exceeds the configured timeout."""

View File

@@ -0,0 +1,322 @@
"""Helper functions for processing A2A task results."""
from __future__ import annotations
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING, TypedDict
import uuid
from a2a.types import (
AgentCard,
Message,
Part,
Role,
Task,
TaskArtifactUpdateEvent,
TaskState,
TaskStatusUpdateEvent,
TextPart,
)
from typing_extensions import NotRequired
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import A2AResponseReceivedEvent
if TYPE_CHECKING:
from a2a.types import Task as A2ATask
SendMessageEvent = (
tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message
)
TERMINAL_STATES: frozenset[TaskState] = frozenset(
{
TaskState.completed,
TaskState.failed,
TaskState.rejected,
TaskState.canceled,
}
)
ACTIONABLE_STATES: frozenset[TaskState] = frozenset(
{
TaskState.input_required,
TaskState.auth_required,
}
)
class TaskStateResult(TypedDict):
"""Result dictionary from processing A2A task state."""
status: TaskState
history: list[Message]
result: NotRequired[str]
error: NotRequired[str]
agent_card: NotRequired[AgentCard]
def extract_task_result_parts(a2a_task: A2ATask) -> list[str]:
"""Extract result parts from A2A task status message, history, and artifacts.
Args:
a2a_task: A2A Task object with status, history, and artifacts
Returns:
List of result text parts
"""
result_parts: list[str] = []
if a2a_task.status and a2a_task.status.message:
msg = a2a_task.status.message
result_parts.extend(
part.root.text for part in msg.parts if part.root.kind == "text"
)
if not result_parts and a2a_task.history:
for history_msg in reversed(a2a_task.history):
if history_msg.role == Role.agent:
result_parts.extend(
part.root.text
for part in history_msg.parts
if part.root.kind == "text"
)
break
if a2a_task.artifacts:
result_parts.extend(
part.root.text
for artifact in a2a_task.artifacts
for part in artifact.parts
if part.root.kind == "text"
)
return result_parts
def extract_error_message(a2a_task: A2ATask, default: str) -> str:
"""Extract error message from A2A task.
Args:
a2a_task: A2A Task object
default: Default message if no error found
Returns:
Error message string
"""
if a2a_task.status and a2a_task.status.message:
msg = a2a_task.status.message
if msg:
for part in msg.parts:
if part.root.kind == "text":
return str(part.root.text)
return str(msg)
if a2a_task.history:
for history_msg in reversed(a2a_task.history):
for part in history_msg.parts:
if part.root.kind == "text":
return str(part.root.text)
return default
def process_task_state(
a2a_task: A2ATask,
new_messages: list[Message],
agent_card: AgentCard,
turn_number: int,
is_multiturn: bool,
agent_role: str | None,
result_parts: list[str] | None = None,
) -> TaskStateResult | None:
"""Process A2A task state and return result dictionary.
Shared logic for both polling and streaming handlers.
Args:
a2a_task: The A2A task to process
new_messages: List to collect messages (modified in place)
agent_card: The agent card
turn_number: Current turn number
is_multiturn: Whether multi-turn conversation
agent_role: Agent role for logging
result_parts: Accumulated result parts (streaming passes accumulated,
polling passes None to extract from task)
Returns:
Result dictionary if terminal/actionable state, None otherwise
"""
should_extract = result_parts is None
if result_parts is None:
result_parts = []
if a2a_task.status.state == TaskState.completed:
if should_extract:
extracted_parts = extract_task_result_parts(a2a_task)
result_parts.extend(extracted_parts)
if a2a_task.history:
new_messages.extend(a2a_task.history)
response_text = " ".join(result_parts) if result_parts else ""
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=response_text,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="completed",
agent_role=agent_role,
),
)
return TaskStateResult(
status=TaskState.completed,
agent_card=agent_card,
result=response_text,
history=new_messages,
)
if a2a_task.status.state == TaskState.input_required:
if a2a_task.history:
new_messages.extend(a2a_task.history)
response_text = extract_error_message(a2a_task, "Additional input required")
if response_text and not a2a_task.history:
agent_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=response_text))],
context_id=a2a_task.context_id,
task_id=a2a_task.id,
)
new_messages.append(agent_message)
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=response_text,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="input_required",
agent_role=agent_role,
),
)
return TaskStateResult(
status=TaskState.input_required,
error=response_text,
history=new_messages,
agent_card=agent_card,
)
if a2a_task.status.state in {TaskState.failed, TaskState.rejected}:
error_msg = extract_error_message(a2a_task, "Task failed without error message")
if a2a_task.history:
new_messages.extend(a2a_task.history)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
history=new_messages,
)
if a2a_task.status.state == TaskState.auth_required:
error_msg = extract_error_message(a2a_task, "Authentication required")
return TaskStateResult(
status=TaskState.auth_required,
error=error_msg,
history=new_messages,
)
if a2a_task.status.state == TaskState.canceled:
error_msg = extract_error_message(a2a_task, "Task was canceled")
return TaskStateResult(
status=TaskState.canceled,
error=error_msg,
history=new_messages,
)
return None
async def send_message_and_get_task_id(
event_stream: AsyncIterator[SendMessageEvent],
new_messages: list[Message],
agent_card: AgentCard,
turn_number: int,
is_multiturn: bool,
agent_role: str | None,
) -> str | TaskStateResult:
"""Send message and process initial response.
Handles the common pattern of sending a message and either:
- Getting an immediate Message response (task completed synchronously)
- Getting a Task that needs polling/waiting for completion
Args:
event_stream: Async iterator from client.send_message()
new_messages: List to collect messages (modified in place)
agent_card: The agent card
turn_number: Current turn number
is_multiturn: Whether multi-turn conversation
agent_role: Agent role for logging
Returns:
Task ID string if agent needs polling/waiting, or TaskStateResult if done.
"""
try:
async for event in event_stream:
if isinstance(event, Message):
new_messages.append(event)
result_parts = [
part.root.text for part in event.parts if part.root.kind == "text"
]
response_text = " ".join(result_parts) if result_parts else ""
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=response_text,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="completed",
agent_role=agent_role,
),
)
return TaskStateResult(
status=TaskState.completed,
result=response_text,
history=new_messages,
agent_card=agent_card,
)
if isinstance(event, tuple):
a2a_task, _ = event
if a2a_task.status.state in TERMINAL_STATES | ACTIONABLE_STATES:
result = process_task_state(
a2a_task=a2a_task,
new_messages=new_messages,
agent_card=agent_card,
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
)
if result:
return result
return a2a_task.id
return TaskStateResult(
status=TaskState.failed,
error="No task ID received from initial message",
history=new_messages,
)
finally:
aclose = getattr(event_stream, "aclose", None)
if aclose:
await aclose()

View File

@@ -27,3 +27,14 @@ UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
" $unavailable_agents"
"\n</A2A_AGENTS_STATUS>\n"
)
REMOTE_AGENT_COMPLETED_NOTICE: Final[str] = """
<REMOTE_AGENT_STATUS>
STATUS: COMPLETED
The remote agent has finished processing your request. Their response is in the conversation history above.
You MUST now:
1. Extract the answer from the conversation history
2. Set is_a2a=false
3. Return the answer as your final message
DO NOT send another request - the task is already done.
</REMOTE_AGENT_STATUS>
"""

View File

@@ -0,0 +1,35 @@
"""A2A update mechanism configuration types."""
from crewai.a2a.updates.base import (
BaseHandlerKwargs,
PollingHandlerKwargs,
PushNotificationHandlerKwargs,
PushNotificationResultStore,
StreamingHandlerKwargs,
UpdateHandler,
)
from crewai.a2a.updates.polling.config import PollingConfig
from crewai.a2a.updates.polling.handler import PollingHandler
from crewai.a2a.updates.push_notifications.config import PushNotificationConfig
from crewai.a2a.updates.push_notifications.handler import PushNotificationHandler
from crewai.a2a.updates.streaming.config import StreamingConfig
from crewai.a2a.updates.streaming.handler import StreamingHandler
UpdateConfig = PollingConfig | StreamingConfig | PushNotificationConfig
__all__ = [
"BaseHandlerKwargs",
"PollingConfig",
"PollingHandler",
"PollingHandlerKwargs",
"PushNotificationConfig",
"PushNotificationHandler",
"PushNotificationHandlerKwargs",
"PushNotificationResultStore",
"StreamingConfig",
"StreamingHandler",
"StreamingHandlerKwargs",
"UpdateConfig",
"UpdateHandler",
]

View File

@@ -0,0 +1,131 @@
"""Base types for A2A update mechanism handlers."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Protocol, TypedDict
from pydantic import GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema
if TYPE_CHECKING:
from a2a.client import Client
from a2a.types import AgentCard, Message, Task
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.updates.push_notifications.config import PushNotificationConfig
class BaseHandlerKwargs(TypedDict, total=False):
"""Base kwargs shared by all handlers."""
turn_number: int
is_multiturn: bool
agent_role: str | None
class PollingHandlerKwargs(BaseHandlerKwargs, total=False):
"""Kwargs for polling handler."""
polling_interval: float
polling_timeout: float
endpoint: str
agent_branch: Any
history_length: int
max_polls: int | None
class StreamingHandlerKwargs(BaseHandlerKwargs, total=False):
"""Kwargs for streaming handler."""
context_id: str | None
task_id: str | None
class PushNotificationHandlerKwargs(BaseHandlerKwargs, total=False):
"""Kwargs for push notification handler."""
config: PushNotificationConfig
result_store: PushNotificationResultStore
polling_timeout: float
polling_interval: float
agent_branch: Any
class PushNotificationResultStore(Protocol):
"""Protocol for storing and retrieving push notification results.
This protocol defines the interface for a result store that the
PushNotificationHandler uses to wait for task completion.
"""
@classmethod
def __get_pydantic_core_schema__(
cls,
source_type: Any,
handler: GetCoreSchemaHandler,
) -> CoreSchema:
return core_schema.any_schema()
async def wait_for_result(
self,
task_id: str,
timeout: float,
poll_interval: float = 1.0,
) -> Task | None:
"""Wait for a task result to be available.
Args:
task_id: The task ID to wait for.
timeout: Max seconds to wait before returning None.
poll_interval: Seconds between polling attempts.
Returns:
The completed Task object, or None if timeout.
"""
...
async def get_result(self, task_id: str) -> Task | None:
"""Get a task result if available.
Args:
task_id: The task ID to retrieve.
Returns:
The Task object if available, None otherwise.
"""
...
async def store_result(self, task: Task) -> None:
"""Store a task result.
Args:
task: The Task object to store.
"""
...
class UpdateHandler(Protocol):
"""Protocol for A2A update mechanism handlers."""
@staticmethod
async def execute(
client: Client,
message: Message,
new_messages: list[Message],
agent_card: AgentCard,
**kwargs: Any,
) -> TaskStateResult:
"""Execute the update mechanism and return result.
Args:
client: A2A client instance.
message: Message to send.
new_messages: List to collect messages (modified in place).
agent_card: The agent card.
**kwargs: Additional handler-specific parameters.
Returns:
Result dictionary with status, result/error, and history.
"""
...

View File

@@ -0,0 +1 @@
"""Polling update mechanism module."""

View File

@@ -0,0 +1,25 @@
"""Polling update mechanism configuration."""
from __future__ import annotations
from pydantic import BaseModel, Field
class PollingConfig(BaseModel):
"""Configuration for polling-based task updates.
Attributes:
interval: Seconds between poll attempts.
timeout: Max seconds to poll before raising timeout error.
max_polls: Max number of poll attempts.
history_length: Number of messages to retrieve per poll.
"""
interval: float = Field(
default=2.0, gt=0, description="Seconds between poll attempts"
)
timeout: float | None = Field(default=None, gt=0, description="Max seconds to poll")
max_polls: int | None = Field(default=None, gt=0, description="Max poll attempts")
history_length: int = Field(
default=100, gt=0, description="Messages to retrieve per poll"
)

View File

@@ -0,0 +1,246 @@
"""Polling update mechanism handler."""
from __future__ import annotations
import asyncio
import time
from typing import TYPE_CHECKING, Any
import uuid
from a2a.client import Client
from a2a.client.errors import A2AClientHTTPError
from a2a.types import (
AgentCard,
Message,
Part,
Role,
TaskQueryParams,
TaskState,
TextPart,
)
from typing_extensions import Unpack
from crewai.a2a.errors import A2APollingTimeoutError
from crewai.a2a.task_helpers import (
ACTIONABLE_STATES,
TERMINAL_STATES,
TaskStateResult,
process_task_state,
send_message_and_get_task_id,
)
from crewai.a2a.updates.base import PollingHandlerKwargs
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2APollingStartedEvent,
A2APollingStatusEvent,
A2AResponseReceivedEvent,
)
if TYPE_CHECKING:
from a2a.types import Task as A2ATask
async def _poll_task_until_complete(
client: Client,
task_id: str,
polling_interval: float,
polling_timeout: float,
agent_branch: Any | None = None,
history_length: int = 100,
max_polls: int | None = None,
) -> A2ATask:
"""Poll task status until terminal state reached.
Args:
client: A2A client instance
task_id: Task ID to poll
polling_interval: Seconds between poll attempts
polling_timeout: Max seconds before timeout
agent_branch: Agent tree branch for logging
history_length: Number of messages to retrieve per poll
max_polls: Max number of poll attempts (None = unlimited)
Returns:
Final task object in terminal state
Raises:
A2APollingTimeoutError: If polling exceeds timeout or max_polls
"""
start_time = time.monotonic()
poll_count = 0
while True:
poll_count += 1
task = await client.get_task(
TaskQueryParams(id=task_id, history_length=history_length)
)
elapsed = time.monotonic() - start_time
crewai_event_bus.emit(
agent_branch,
A2APollingStatusEvent(
task_id=task_id,
state=str(task.status.state.value) if task.status.state else "unknown",
elapsed_seconds=elapsed,
poll_count=poll_count,
),
)
if task.status.state in TERMINAL_STATES | ACTIONABLE_STATES:
return task
if elapsed > polling_timeout:
raise A2APollingTimeoutError(
f"Polling timeout after {polling_timeout}s ({poll_count} polls)"
)
if max_polls and poll_count >= max_polls:
raise A2APollingTimeoutError(
f"Max polls ({max_polls}) exceeded after {elapsed:.1f}s"
)
await asyncio.sleep(polling_interval)
class PollingHandler:
"""Polling-based update handler."""
@staticmethod
async def execute(
client: Client,
message: Message,
new_messages: list[Message],
agent_card: AgentCard,
**kwargs: Unpack[PollingHandlerKwargs],
) -> TaskStateResult:
"""Execute A2A delegation using polling for updates.
Args:
client: A2A client instance.
message: Message to send.
new_messages: List to collect messages.
agent_card: The agent card.
**kwargs: Polling-specific parameters.
Returns:
Dictionary with status, result/error, and history.
"""
polling_interval = kwargs.get("polling_interval", 2.0)
polling_timeout = kwargs.get("polling_timeout", 300.0)
endpoint = kwargs.get("endpoint", "")
agent_branch = kwargs.get("agent_branch")
turn_number = kwargs.get("turn_number", 0)
is_multiturn = kwargs.get("is_multiturn", False)
agent_role = kwargs.get("agent_role")
history_length = kwargs.get("history_length", 100)
max_polls = kwargs.get("max_polls")
context_id = kwargs.get("context_id")
task_id = kwargs.get("task_id")
try:
result_or_task_id = await send_message_and_get_task_id(
event_stream=client.send_message(message),
new_messages=new_messages,
agent_card=agent_card,
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
)
if not isinstance(result_or_task_id, str):
return result_or_task_id
task_id = result_or_task_id
crewai_event_bus.emit(
agent_branch,
A2APollingStartedEvent(
task_id=task_id,
polling_interval=polling_interval,
endpoint=endpoint,
),
)
final_task = await _poll_task_until_complete(
client=client,
task_id=task_id,
polling_interval=polling_interval,
polling_timeout=polling_timeout,
agent_branch=agent_branch,
history_length=history_length,
max_polls=max_polls,
)
result = process_task_state(
a2a_task=final_task,
new_messages=new_messages,
agent_card=agent_card,
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
)
if result:
return result
return TaskStateResult(
status=TaskState.failed,
error=f"Unexpected task state: {final_task.status.state}",
history=new_messages,
)
except A2APollingTimeoutError as e:
error_msg = str(e)
error_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=error_msg))],
context_id=context_id,
task_id=task_id,
)
new_messages.append(error_message)
crewai_event_bus.emit(
agent_branch,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="failed",
agent_role=agent_role,
),
)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
history=new_messages,
)
except A2AClientHTTPError as e:
error_msg = f"HTTP Error {e.status_code}: {e!s}"
error_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=error_msg))],
context_id=context_id,
task_id=task_id,
)
new_messages.append(error_message)
crewai_event_bus.emit(
agent_branch,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="failed",
agent_role=agent_role,
),
)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
history=new_messages,
)

View File

@@ -0,0 +1 @@
"""Push notification update mechanism module."""

View File

@@ -0,0 +1,38 @@
"""Push notification update mechanism configuration."""
from __future__ import annotations
from a2a.types import PushNotificationAuthenticationInfo
from pydantic import AnyHttpUrl, BaseModel, Field
from crewai.a2a.updates.base import PushNotificationResultStore
class PushNotificationConfig(BaseModel):
"""Configuration for webhook-based task updates.
Attributes:
url: Callback URL where agent sends push notifications.
id: Unique identifier for this config.
token: Token to validate incoming notifications.
authentication: Auth info for agent to use when calling webhook.
timeout: Max seconds to wait for task completion.
interval: Seconds between result polling attempts.
result_store: Store for receiving push notification results.
"""
url: AnyHttpUrl = Field(description="Callback URL for push notifications")
id: str | None = Field(default=None, description="Unique config identifier")
token: str | None = Field(default=None, description="Validation token")
authentication: PushNotificationAuthenticationInfo | None = Field(
default=None, description="Auth info for agent to use when calling webhook"
)
timeout: float | None = Field(
default=300.0, gt=0, description="Max seconds to wait for task completion"
)
interval: float = Field(
default=2.0, gt=0, description="Seconds between result polling attempts"
)
result_store: PushNotificationResultStore | None = Field(
default=None, description="Result store for push notification handling"
)

View File

@@ -0,0 +1,220 @@
"""Push notification (webhook) update mechanism handler."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
import uuid
from a2a.client import Client
from a2a.client.errors import A2AClientHTTPError
from a2a.types import (
AgentCard,
Message,
Part,
Role,
TaskState,
TextPart,
)
from typing_extensions import Unpack
from crewai.a2a.task_helpers import (
TaskStateResult,
process_task_state,
send_message_and_get_task_id,
)
from crewai.a2a.updates.base import (
PushNotificationHandlerKwargs,
PushNotificationResultStore,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2APushNotificationRegisteredEvent,
A2APushNotificationTimeoutEvent,
A2AResponseReceivedEvent,
)
if TYPE_CHECKING:
from a2a.types import Task as A2ATask
logger = logging.getLogger(__name__)
async def _wait_for_push_result(
task_id: str,
result_store: PushNotificationResultStore,
timeout: float,
poll_interval: float,
agent_branch: Any | None = None,
) -> A2ATask | None:
"""Wait for push notification result.
Args:
task_id: Task ID to wait for.
result_store: Store to retrieve results from.
timeout: Max seconds to wait.
poll_interval: Seconds between polling attempts.
agent_branch: Agent tree branch for logging.
Returns:
Final task object, or None if timeout.
"""
task = await result_store.wait_for_result(
task_id=task_id,
timeout=timeout,
poll_interval=poll_interval,
)
if task is None:
crewai_event_bus.emit(
agent_branch,
A2APushNotificationTimeoutEvent(
task_id=task_id,
timeout_seconds=timeout,
),
)
return task
class PushNotificationHandler:
"""Push notification (webhook) based update handler."""
@staticmethod
async def execute(
client: Client,
message: Message,
new_messages: list[Message],
agent_card: AgentCard,
**kwargs: Unpack[PushNotificationHandlerKwargs],
) -> TaskStateResult:
"""Execute A2A delegation using push notifications for updates.
Args:
client: A2A client instance.
message: Message to send.
new_messages: List to collect messages.
agent_card: The agent card.
**kwargs: Push notification-specific parameters.
Returns:
Dictionary with status, result/error, and history.
Raises:
ValueError: If result_store or config not provided.
"""
config = kwargs.get("config")
result_store = kwargs.get("result_store")
polling_timeout = kwargs.get("polling_timeout", 300.0)
polling_interval = kwargs.get("polling_interval", 2.0)
agent_branch = kwargs.get("agent_branch")
turn_number = kwargs.get("turn_number", 0)
is_multiturn = kwargs.get("is_multiturn", False)
agent_role = kwargs.get("agent_role")
context_id = kwargs.get("context_id")
task_id = kwargs.get("task_id")
if config is None:
return TaskStateResult(
status=TaskState.failed,
error="PushNotificationConfig is required for push notification handler",
history=new_messages,
)
if result_store is None:
return TaskStateResult(
status=TaskState.failed,
error="PushNotificationResultStore is required for push notification handler",
history=new_messages,
)
try:
result_or_task_id = await send_message_and_get_task_id(
event_stream=client.send_message(message),
new_messages=new_messages,
agent_card=agent_card,
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
)
if not isinstance(result_or_task_id, str):
return result_or_task_id
task_id = result_or_task_id
crewai_event_bus.emit(
agent_branch,
A2APushNotificationRegisteredEvent(
task_id=task_id,
callback_url=str(config.url),
),
)
logger.debug(
"Push notification callback for task %s configured at %s (via initial request)",
task_id,
config.url,
)
final_task = await _wait_for_push_result(
task_id=task_id,
result_store=result_store,
timeout=polling_timeout,
poll_interval=polling_interval,
agent_branch=agent_branch,
)
if final_task is None:
return TaskStateResult(
status=TaskState.failed,
error=f"Push notification timeout after {polling_timeout}s",
history=new_messages,
)
result = process_task_state(
a2a_task=final_task,
new_messages=new_messages,
agent_card=agent_card,
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
)
if result:
return result
return TaskStateResult(
status=TaskState.failed,
error=f"Unexpected task state: {final_task.status.state}",
history=new_messages,
)
except A2AClientHTTPError as e:
error_msg = f"HTTP Error {e.status_code}: {e!s}"
error_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=error_msg))],
context_id=context_id,
task_id=task_id,
)
new_messages.append(error_message)
crewai_event_bus.emit(
agent_branch,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="failed",
agent_role=agent_role,
),
)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
history=new_messages,
)

View File

@@ -0,0 +1 @@
"""Streaming update mechanism module."""

View File

@@ -0,0 +1,9 @@
"""Streaming update mechanism configuration."""
from __future__ import annotations
from pydantic import BaseModel
class StreamingConfig(BaseModel):
"""Configuration for SSE-based task updates."""

View File

@@ -0,0 +1,149 @@
"""Streaming (SSE) update mechanism handler."""
from __future__ import annotations
import uuid
from a2a.client import Client
from a2a.client.errors import A2AClientHTTPError
from a2a.types import (
AgentCard,
Message,
Part,
Role,
TaskArtifactUpdateEvent,
TaskState,
TaskStatusUpdateEvent,
TextPart,
)
from typing_extensions import Unpack
from crewai.a2a.task_helpers import (
ACTIONABLE_STATES,
TERMINAL_STATES,
TaskStateResult,
process_task_state,
)
from crewai.a2a.updates.base import StreamingHandlerKwargs
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import A2AResponseReceivedEvent
class StreamingHandler:
"""SSE streaming-based update handler."""
@staticmethod
async def execute(
client: Client,
message: Message,
new_messages: list[Message],
agent_card: AgentCard,
**kwargs: Unpack[StreamingHandlerKwargs],
) -> TaskStateResult:
"""Execute A2A delegation using SSE streaming for updates.
Args:
client: A2A client instance.
message: Message to send.
new_messages: List to collect messages.
agent_card: The agent card.
**kwargs: Streaming-specific parameters.
Returns:
Dictionary with status, result/error, and history.
"""
context_id = kwargs.get("context_id")
task_id = kwargs.get("task_id")
turn_number = kwargs.get("turn_number", 0)
is_multiturn = kwargs.get("is_multiturn", False)
agent_role = kwargs.get("agent_role")
result_parts: list[str] = []
final_result: TaskStateResult | None = None
event_stream = client.send_message(message)
try:
async for event in event_stream:
if isinstance(event, Message):
new_messages.append(event)
for part in event.parts:
if part.root.kind == "text":
text = part.root.text
result_parts.append(text)
elif isinstance(event, tuple):
a2a_task, update = event
if isinstance(update, TaskArtifactUpdateEvent):
artifact = update.artifact
result_parts.extend(
part.root.text
for part in artifact.parts
if part.root.kind == "text"
)
is_final_update = False
if isinstance(update, TaskStatusUpdateEvent):
is_final_update = update.final
if (
not is_final_update
and a2a_task.status.state
not in TERMINAL_STATES | ACTIONABLE_STATES
):
continue
final_result = process_task_state(
a2a_task=a2a_task,
new_messages=new_messages,
agent_card=agent_card,
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
result_parts=result_parts,
)
if final_result:
break
except A2AClientHTTPError as e:
error_msg = f"HTTP Error {e.status_code}: {e!s}"
error_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=error_msg))],
context_id=context_id,
task_id=task_id,
)
new_messages.append(error_message)
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="failed",
agent_role=agent_role,
),
)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
history=new_messages,
)
finally:
aclose = getattr(event_stream, "aclose", None)
if aclose:
await aclose()
if final_result:
return final_result
return TaskStateResult(
status=TaskState.completed,
result=" ".join(result_parts) if result_parts else "",
history=new_messages,
agent_card=agent_card,
)

View File

@@ -10,16 +10,13 @@ import time
from typing import TYPE_CHECKING, Any
import uuid
from a2a.client import Client, ClientConfig, ClientFactory
from a2a.client.errors import A2AClientHTTPError
from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
Part,
PushNotificationConfig as A2APushNotificationConfig,
Role,
TaskArtifactUpdateEvent,
TaskState,
TaskStatusUpdateEvent,
TextPart,
TransportProtocol,
)
@@ -36,24 +33,58 @@ from crewai.a2a.auth.utils import (
validate_auth_against_agent_card,
)
from crewai.a2a.config import A2AConfig
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.types import PartsDict, PartsMetadataDict
from crewai.a2a.updates import (
PollingConfig,
PollingHandler,
PushNotificationConfig,
PushNotificationHandler,
StreamingConfig,
StreamingHandler,
UpdateConfig,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AConversationStartedEvent,
A2ADelegationCompletedEvent,
A2ADelegationStartedEvent,
A2AMessageSentEvent,
A2AResponseReceivedEvent,
)
from crewai.types.utils import create_literals_from_strings
if TYPE_CHECKING:
from a2a.types import Message, Task as A2ATask
from a2a.types import Message
from crewai.a2a.auth.schemas import AuthScheme
HandlerType = (
type[PollingHandler] | type[StreamingHandler] | type[PushNotificationHandler]
)
HANDLER_REGISTRY: dict[type[UpdateConfig], HandlerType] = {
PollingConfig: PollingHandler,
StreamingConfig: StreamingHandler,
PushNotificationConfig: PushNotificationHandler,
}
def get_handler(config: UpdateConfig | None) -> HandlerType:
"""Get the handler class for a given update config.
Args:
config: Update mechanism configuration.
Returns:
Handler class for the config type, defaults to StreamingHandler.
"""
if config is None:
return StreamingHandler
return HANDLER_REGISTRY.get(type(config), StreamingHandler)
@lru_cache()
def _fetch_agent_card_cached(
endpoint: str,
@@ -235,26 +266,20 @@ def execute_a2a_delegation(
agent_branch: Any | None = None,
response_model: type[BaseModel] | None = None,
turn_number: int | None = None,
) -> dict[str, Any]:
updates: UpdateConfig | None = None,
) -> TaskStateResult:
"""Execute a task delegation to a remote A2A agent with multi-turn support.
Handles:
- AgentCard discovery
- Authentication setup
- Message creation and sending
- Response parsing
- Multi-turn conversations
Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest)
endpoint: A2A agent endpoint URL
auth: Optional AuthScheme for authentication
timeout: Request timeout in seconds
task_description: The task to delegate
context: Optional context information
context_id: Context ID for correlating messages/tasks
task_id: Specific task identifier
reference_task_ids: List of related task IDs
metadata: Additional metadata (external_id, request_id, etc.)
metadata: Additional metadata
extensions: Protocol extensions for custom fields
conversation_history: Previous Message objects from conversation
agent_id: Agent identifier for logging
@@ -262,16 +287,10 @@ def execute_a2a_delegation(
agent_branch: Optional agent tree branch for logging
response_model: Optional Pydantic model for structured outputs
turn_number: Optional turn number for multi-turn conversations
updates: Update mechanism config from A2AConfig.updates
Returns:
Dictionary with:
- status: "completed", "input_required", "failed", etc.
- result: Result string (if completed)
- error: Error message (if failed)
- history: List of new Message objects from this exchange
Raises:
ImportError: If a2a-sdk is not installed
TaskStateResult with status, result/error, history, and agent_card
"""
is_multiturn = bool(conversation_history and len(conversation_history) > 0)
if turn_number is None:
@@ -311,6 +330,7 @@ def execute_a2a_delegation(
agent_id=agent_id,
agent_role=agent_role,
response_model=response_model,
updates=updates,
)
)
@@ -347,7 +367,8 @@ async def _execute_a2a_delegation_async(
agent_id: str | None = None,
agent_role: str | None = None,
response_model: type[BaseModel] | None = None,
) -> dict[str, Any]:
updates: UpdateConfig | None = None,
) -> TaskStateResult:
"""Async implementation of A2A delegation with multi-turn support.
Args:
@@ -368,9 +389,10 @@ async def _execute_a2a_delegation_async(
agent_id: Agent identifier for logging
agent_role: Agent role for logging
response_model: Optional Pydantic model for structured outputs
updates: Update mechanism config
Returns:
Dictionary with status, result/error, and new history
TaskStateResult with status, result/error, history, and agent_card
"""
if auth:
auth_data = auth.model_dump_json(
@@ -458,201 +480,61 @@ async def _execute_a2a_delegation_async(
),
)
handler = get_handler(updates)
use_polling = isinstance(updates, PollingConfig)
handler_kwargs: dict[str, Any] = {
"turn_number": turn_number,
"is_multiturn": is_multiturn,
"agent_role": agent_role,
"context_id": context_id,
"task_id": task_id,
"endpoint": endpoint,
"agent_branch": agent_branch,
}
if isinstance(updates, PollingConfig):
handler_kwargs.update(
{
"polling_interval": updates.interval,
"polling_timeout": updates.timeout or float(timeout),
"history_length": updates.history_length,
"max_polls": updates.max_polls,
}
)
elif isinstance(updates, PushNotificationConfig):
handler_kwargs.update(
{
"config": updates,
"result_store": updates.result_store,
"polling_timeout": updates.timeout or float(timeout),
"polling_interval": updates.interval,
}
)
push_config_for_client = (
updates if isinstance(updates, PushNotificationConfig) else None
)
use_streaming = not use_polling and push_config_for_client is None
async with _create_a2a_client(
agent_card=agent_card,
transport_protocol=transport_protocol,
timeout=timeout,
headers=headers,
streaming=True,
streaming=use_streaming,
auth=auth,
use_polling=use_polling,
push_notification_config=push_config_for_client,
) as client:
result_parts: list[str] = []
final_result: dict[str, Any] | None = None
event_stream = client.send_message(message)
try:
async for event in event_stream:
if isinstance(event, Message):
new_messages.append(event)
for part in event.parts:
if part.root.kind == "text":
text = part.root.text
result_parts.append(text)
elif isinstance(event, tuple):
a2a_task, update = event
if isinstance(update, TaskArtifactUpdateEvent):
artifact = update.artifact
result_parts.extend(
part.root.text
for part in artifact.parts
if part.root.kind == "text"
)
is_final_update = False
if isinstance(update, TaskStatusUpdateEvent):
is_final_update = update.final
if not is_final_update and a2a_task.status.state not in [
TaskState.completed,
TaskState.input_required,
TaskState.failed,
TaskState.rejected,
TaskState.auth_required,
TaskState.canceled,
]:
continue
if a2a_task.status.state == TaskState.completed:
extracted_parts = _extract_task_result_parts(a2a_task)
result_parts.extend(extracted_parts)
if a2a_task.history:
new_messages.extend(a2a_task.history)
response_text = " ".join(result_parts) if result_parts else ""
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=response_text,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="completed",
agent_role=agent_role,
),
)
final_result = {
"status": "completed",
"result": response_text,
"history": new_messages,
"agent_card": agent_card,
}
break
if a2a_task.status.state == TaskState.input_required:
if a2a_task.history:
new_messages.extend(a2a_task.history)
response_text = _extract_error_message(
a2a_task, "Additional input required"
)
if response_text and not a2a_task.history:
agent_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=response_text))],
context_id=a2a_task.context_id
if hasattr(a2a_task, "context_id")
else None,
task_id=a2a_task.task_id
if hasattr(a2a_task, "task_id")
else None,
)
new_messages.append(agent_message)
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=response_text,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="input_required",
agent_role=agent_role,
),
)
final_result = {
"status": "input_required",
"error": response_text,
"history": new_messages,
"agent_card": agent_card,
}
break
if a2a_task.status.state in [TaskState.failed, TaskState.rejected]:
error_msg = _extract_error_message(
a2a_task, "Task failed without error message"
)
if a2a_task.history:
new_messages.extend(a2a_task.history)
final_result = {
"status": "failed",
"error": error_msg,
"history": new_messages,
}
break
if a2a_task.status.state == TaskState.auth_required:
error_msg = _extract_error_message(
a2a_task, "Authentication required"
)
final_result = {
"status": "auth_required",
"error": error_msg,
"history": new_messages,
}
break
if a2a_task.status.state == TaskState.canceled:
error_msg = _extract_error_message(
a2a_task, "Task was canceled"
)
final_result = {
"status": "canceled",
"error": error_msg,
"history": new_messages,
}
break
except Exception as e:
if isinstance(e, A2AClientHTTPError):
error_msg = f"HTTP Error {e.status_code}: {e!s}"
error_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=error_msg))],
context_id=context_id,
task_id=task_id,
)
new_messages.append(error_message)
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="failed",
agent_role=agent_role,
),
)
return {
"status": "failed",
"error": error_msg,
"history": new_messages,
}
current_exception: Exception | BaseException | None = e
while current_exception:
if hasattr(current_exception, "response"):
response = current_exception.response
if hasattr(response, "text"):
break
if current_exception and hasattr(current_exception, "__cause__"):
current_exception = current_exception.__cause__
raise
finally:
if hasattr(event_stream, "aclose"):
await event_stream.aclose()
if final_result:
return final_result
return {
"status": "completed",
"result": " ".join(result_parts) if result_parts else "",
"history": new_messages,
}
return await handler.execute(
client=client,
message=message,
new_messages=new_messages,
agent_card=agent_card,
**handler_kwargs,
)
@asynccontextmanager
@@ -663,6 +545,8 @@ async def _create_a2a_client(
headers: MutableMapping[str, str],
streaming: bool,
auth: AuthScheme | None = None,
use_polling: bool = False,
push_notification_config: PushNotificationConfig | None = None,
) -> AsyncIterator[Client]:
"""Create and configure an A2A client.
@@ -673,6 +557,8 @@ async def _create_a2a_client(
headers: HTTP headers (already with auth applied)
streaming: Enable streaming responses
auth: Optional AuthScheme for client configuration
use_polling: Enable polling mode
push_notification_config: Optional push notification config to include in requests
Yields:
Configured A2A client instance
@@ -685,11 +571,24 @@ async def _create_a2a_client(
if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, httpx_client)
push_configs: list[A2APushNotificationConfig] = []
if push_notification_config is not None:
push_configs.append(
A2APushNotificationConfig(
url=str(push_notification_config.url),
id=push_notification_config.id,
token=push_notification_config.token,
authentication=push_notification_config.authentication,
)
)
config = ClientConfig(
httpx_client=httpx_client,
supported_transports=[str(transport_protocol.value)],
streaming=streaming,
streaming=streaming and not use_polling,
polling=use_polling,
accepted_output_modes=["application/json"],
push_notification_configs=push_configs,
)
factory = ClientFactory(config)
@@ -697,66 +596,6 @@ async def _create_a2a_client(
yield client
def _extract_task_result_parts(a2a_task: A2ATask) -> list[str]:
"""Extract result parts from A2A task history and artifacts.
Args:
a2a_task: A2A Task object with history and artifacts
Returns:
List of result text parts
"""
result_parts: list[str] = []
if a2a_task.history:
for history_msg in reversed(a2a_task.history):
if history_msg.role == Role.agent:
result_parts.extend(
part.root.text
for part in history_msg.parts
if part.root.kind == "text"
)
break
if a2a_task.artifacts:
result_parts.extend(
part.root.text
for artifact in a2a_task.artifacts
for part in artifact.parts
if part.root.kind == "text"
)
return result_parts
def _extract_error_message(a2a_task: A2ATask, default: str) -> str:
"""Extract error message from A2A task.
Args:
a2a_task: A2A Task object
default: Default message if no error found
Returns:
Error message string
"""
if a2a_task.status and a2a_task.status.message:
msg = a2a_task.status.message
if msg:
for part in msg.parts:
if part.root.kind == "text":
return str(part.root.text)
return str(msg)
if a2a_task.history:
for history_msg in reversed(a2a_task.history):
for part in history_msg.parts:
if part.root.kind == "text":
return str(part.root.text)
return default
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]:
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
@@ -788,7 +627,7 @@ def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]:
is_a2a=(
bool,
Field(
description="Set to true to continue the conversation by sending this message to the A2A agent and awaiting their response. Set to false ONLY when you are completely done and providing your final answer (not when asking questions)."
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
),
),
__base__=BaseModel,

View File

@@ -9,17 +9,19 @@ from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
from types import MethodType
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any
from a2a.types import Role
from a2a.types import Role, TaskState
from pydantic import BaseModel, ValidationError
from crewai.a2a.config import A2AConfig
from crewai.a2a.extensions.base import ExtensionRegistry
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.templates import (
AVAILABLE_AGENTS_TEMPLATE,
CONVERSATION_TURN_INFO_TEMPLATE,
PREVIOUS_A2A_CONVERSATION_TEMPLATE,
REMOTE_AGENT_COMPLETED_NOTICE,
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE,
)
from crewai.a2a.types import AgentResponseProtocol
@@ -255,6 +257,7 @@ def _augment_prompt_with_a2a(
max_turns: int | None = None,
failed_agents: dict[str, str] | None = None,
extension_registry: ExtensionRegistry | None = None,
remote_task_completed: bool = False,
) -> tuple[str, bool]:
"""Add A2A delegation instructions to prompt.
@@ -327,12 +330,15 @@ def _augment_prompt_with_a2a(
warning=warning,
)
completion_notice = ""
if remote_task_completed and conversation_history:
completion_notice = REMOTE_AGENT_COMPLETED_NOTICE
augmented_prompt = f"""{task_description}
IMPORTANT: You have the ability to delegate this task to remote A2A agents.
{agents_text}
{history_text}{turn_info}
{history_text}{turn_info}{completion_notice}
"""
@@ -346,7 +352,7 @@ IMPORTANT: You have the ability to delegate this task to remote A2A agents.
def _parse_agent_response(
raw_result: str | dict[str, Any], agent_response_model: type[BaseModel]
) -> BaseModel | str:
) -> BaseModel | str | dict[str, Any]:
"""Parse LLM output as AgentResponse or return raw agent response.
Args:
@@ -354,7 +360,7 @@ def _parse_agent_response(
agent_response_model: The agent response model
Returns:
Parsed AgentResponse or string
Parsed AgentResponse, or raw result if parsing fails
"""
if agent_response_model:
try:
@@ -363,13 +369,13 @@ def _parse_agent_response(
if isinstance(raw_result, dict):
return agent_response_model.model_validate(raw_result)
except ValidationError:
return cast(str, raw_result)
return cast(str, raw_result)
return raw_result
return raw_result
def _handle_agent_response_and_continue(
self: Agent,
a2a_result: dict[str, Any],
a2a_result: TaskStateResult,
agent_id: str,
agent_cards: dict[str, AgentCard] | None,
a2a_agents: list[A2AConfig],
@@ -382,6 +388,7 @@ def _handle_agent_response_and_continue(
context: str | None,
tools: list[BaseTool] | None,
agent_response_model: type[BaseModel],
remote_task_completed: bool = False,
) -> tuple[str | None, str | None]:
"""Handle A2A result and get CrewAI agent's response.
@@ -417,6 +424,7 @@ def _handle_agent_response_and_continue(
turn_num=turn_num,
max_turns=max_turns,
agent_cards=agent_cards_dict,
remote_task_completed=remote_task_completed,
)
original_response_model = task.response_model
@@ -568,6 +576,7 @@ def _delegate_to_a2a(
agent_branch=agent_branch,
response_model=agent_config.response_model,
turn_number=turn_num + 1,
updates=agent_config.updates,
)
conversation_history = a2a_result.get("history", [])
@@ -579,11 +588,8 @@ def _delegate_to_a2a(
if latest_message.context_id is not None:
context_id = latest_message.context_id
if a2a_result["status"] in ["completed", "input_required"]:
if (
a2a_result["status"] == "completed"
and agent_config.trust_remote_completion_status
):
if a2a_result["status"] in [TaskState.completed, TaskState.input_required]:
if a2a_result["status"] == TaskState.completed:
if (
task_id_config is not None
and task_id_config not in reference_task_ids
@@ -592,7 +598,12 @@ def _delegate_to_a2a(
if task.config is None:
task.config = {}
task.config["reference_task_ids"] = reference_task_ids
task_id_config = None
if (
a2a_result["status"] == TaskState.completed
and agent_config.trust_remote_completion_status
):
result_text = a2a_result.get("result", "")
final_turn_number = turn_num + 1
crewai_event_bus.emit(
@@ -604,7 +615,7 @@ def _delegate_to_a2a(
total_turns=final_turn_number,
),
)
return cast(str, result_text)
return str(result_text)
final_result, next_request = _handle_agent_response_and_continue(
self=self,
@@ -621,6 +632,7 @@ def _delegate_to_a2a(
context=context,
tools=tools,
agent_response_model=agent_response_model,
remote_task_completed=(a2a_result["status"] == TaskState.completed),
)
if final_result is not None:
@@ -648,6 +660,7 @@ def _delegate_to_a2a(
context=context,
tools=tools,
agent_response_model=agent_response_model,
remote_task_completed=False,
)
if final_result is not None:

View File

@@ -10,7 +10,7 @@ from collections.abc import Callable
import logging
from typing import TYPE_CHECKING, Any, Literal, cast
from pydantic import BaseModel, GetCoreSchemaHandler
from pydantic import BaseModel, GetCoreSchemaHandler, ValidationError
from pydantic_core import CoreSchema, core_schema
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
@@ -244,7 +244,20 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
response_model=self.response_model,
executor_context=self,
)
formatted_answer = process_llm_response(answer, self.use_stop_words) # type: ignore[assignment]
if self.response_model is not None:
try:
self.response_model.model_validate_json(answer)
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
except ValidationError:
formatted_answer = process_llm_response(
answer, self.use_stop_words
) # type: ignore[assignment]
else:
formatted_answer = process_llm_response(answer, self.use_stop_words) # type: ignore[assignment]
if isinstance(formatted_answer, AgentAction):
# Extract agent fingerprint if available
@@ -278,7 +291,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
self._invoke_step_callback(formatted_answer) # type: ignore[arg-type]
self._append_message(formatted_answer.text) # type: ignore[union-attr,attr-defined]
self._append_message(formatted_answer.text) # type: ignore[union-attr]
except OutputParserError as e:
formatted_answer = handle_output_parser_exception( # type: ignore[assignment]
@@ -398,7 +411,21 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
response_model=self.response_model,
executor_context=self,
)
formatted_answer = process_llm_response(answer, self.use_stop_words) # type: ignore[assignment]
if self.response_model is not None:
try:
self.response_model.model_validate_json(answer)
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
except ValidationError:
formatted_answer = process_llm_response(
answer, self.use_stop_words
) # type: ignore[assignment]
else:
formatted_answer = process_llm_response(answer, self.use_stop_words) # type: ignore[assignment]
if isinstance(formatted_answer, AgentAction):
fingerprint_context = {}
@@ -431,7 +458,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
self._invoke_step_callback(formatted_answer) # type: ignore[arg-type]
self._append_message(formatted_answer.text) # type: ignore[union-attr,attr-defined]
self._append_message(formatted_answer.text) # type: ignore[union-attr]
except OutputParserError as e:
formatted_answer = handle_output_parser_exception( # type: ignore[assignment]

View File

@@ -13,6 +13,8 @@ from crewai.events.types.a2a_events import (
A2ADelegationCompletedEvent,
A2ADelegationStartedEvent,
A2AMessageSentEvent,
A2APollingStartedEvent,
A2APollingStatusEvent,
A2AResponseReceivedEvent,
)
from crewai.events.types.agent_events import (
@@ -608,6 +610,23 @@ class EventListener(BaseEventListener):
event.total_turns,
)
@crewai_event_bus.on(A2APollingStartedEvent)
def on_a2a_polling_started(_: Any, event: A2APollingStartedEvent) -> None:
self.formatter.handle_a2a_polling_started(
event.task_id,
event.polling_interval,
event.endpoint,
)
@crewai_event_bus.on(A2APollingStatusEvent)
def on_a2a_polling_status(_: Any, event: A2APollingStatusEvent) -> None:
self.formatter.handle_a2a_polling_status(
event.task_id,
event.state,
event.elapsed_seconds,
event.poll_count,
)
# ----------- MCP EVENTS -----------
@crewai_event_bus.on(MCPConnectionStartedEvent)

View File

@@ -15,7 +15,7 @@ class A2AEventBase(BaseEvent):
from_task: Any | None = None
from_agent: Any | None = None
def __init__(self, **data):
def __init__(self, **data: Any) -> None:
"""Initialize A2A event, extracting task and agent metadata."""
if data.get("from_task"):
task = data["from_task"]
@@ -139,3 +139,74 @@ class A2AConversationCompletedEvent(A2AEventBase):
final_result: str | None = None
error: str | None = None
total_turns: int
class A2APollingStartedEvent(A2AEventBase):
"""Event emitted when polling mode begins for A2A delegation.
Attributes:
task_id: A2A task ID being polled
polling_interval: Seconds between poll attempts
endpoint: A2A agent endpoint URL
"""
type: str = "a2a_polling_started"
task_id: str
polling_interval: float
endpoint: str
class A2APollingStatusEvent(A2AEventBase):
"""Event emitted on each polling iteration.
Attributes:
task_id: A2A task ID being polled
state: Current task state from remote agent
elapsed_seconds: Time since polling started
poll_count: Number of polls completed
"""
type: str = "a2a_polling_status"
task_id: str
state: str
elapsed_seconds: float
poll_count: int
class A2APushNotificationRegisteredEvent(A2AEventBase):
"""Event emitted when push notification callback is registered.
Attributes:
task_id: A2A task ID for which callback is registered
callback_url: URL where agent will send push notifications
"""
type: str = "a2a_push_notification_registered"
task_id: str
callback_url: str
class A2APushNotificationReceivedEvent(A2AEventBase):
"""Event emitted when a push notification is received.
Attributes:
task_id: A2A task ID from the notification
state: Current task state from the notification
"""
type: str = "a2a_push_notification_received"
task_id: str
state: str
class A2APushNotificationTimeoutEvent(A2AEventBase):
"""Event emitted when push notification wait times out.
Attributes:
task_id: A2A task ID that timed out
timeout_seconds: Timeout duration in seconds
"""
type: str = "a2a_push_notification_timeout"
task_id: str
timeout_seconds: float

View File

@@ -114,7 +114,6 @@ To enable tracing, do any one of these:
New streaming sessions will be created on-demand when needed.
This method exists for API compatibility with HITL callers.
"""
pass
def print_panel(
self, content: Text, title: str, style: str = "blue", is_flow: bool = False
@@ -1417,3 +1416,49 @@ To enable tracing, do any one of these:
panel = self.create_panel(content, "❌ MCP Tool Failed", "red")
self.print(panel)
self.print()
def handle_a2a_polling_started(
self,
task_id: str,
polling_interval: float,
endpoint: str,
) -> None:
"""Handle A2A polling started event with panel display."""
content = Text()
content.append("A2A Polling Started\n", style="cyan bold")
content.append("Task ID: ", style="white")
content.append(f"{task_id[:8]}...\n", style="cyan")
content.append("Interval: ", style="white")
content.append(f"{polling_interval}s\n", style="cyan")
self.print_panel(content, "⏳ A2A Polling", "cyan")
def handle_a2a_polling_status(
self,
task_id: str,
state: str,
elapsed_seconds: float,
poll_count: int,
) -> None:
"""Handle A2A polling status event with panel display."""
if state == "completed":
style = "green"
status_indicator = ""
elif state == "failed":
style = "red"
status_indicator = ""
elif state == "working":
style = "yellow"
status_indicator = ""
else:
style = "cyan"
status_indicator = ""
content = Text()
content.append(f"Poll #{poll_count}\n", style=f"{style} bold")
content.append("Status: ", style="white")
content.append(f"{status_indicator} {state}\n", style=style)
content.append("Elapsed: ", style="white")
content.append(f"{elapsed_seconds:.1f}s\n", style=style)
self.print_panel(content, f"📊 A2A Poll #{poll_count}", style)

View File

@@ -0,0 +1,323 @@
from __future__ import annotations
import os
import uuid
import pytest
import pytest_asyncio
from a2a.client import ClientFactory
from a2a.types import AgentCard, Message, Part, Role, TaskState, TextPart
from crewai.a2a.updates.polling.handler import PollingHandler
from crewai.a2a.updates.streaming.handler import StreamingHandler
A2A_TEST_ENDPOINT = os.getenv("A2A_TEST_ENDPOINT", "http://localhost:9999")
@pytest_asyncio.fixture
async def a2a_client():
"""Create A2A client for test server."""
client = await ClientFactory.connect(A2A_TEST_ENDPOINT)
yield client
await client.close()
@pytest.fixture
def test_message() -> Message:
"""Create a simple test message."""
return Message(
role=Role.user,
parts=[Part(root=TextPart(text="What is 2 + 2?"))],
message_id=str(uuid.uuid4()),
)
@pytest_asyncio.fixture
async def agent_card(a2a_client) -> AgentCard:
"""Fetch the real agent card from the server."""
return await a2a_client.get_card()
class TestA2AAgentCardFetching:
"""Integration tests for agent card fetching."""
@pytest.mark.vcr()
@pytest.mark.asyncio
async def test_fetch_agent_card(self, a2a_client) -> None:
"""Test fetching an agent card from the server."""
card = await a2a_client.get_card()
assert card is not None
assert card.name == "GPT Assistant"
assert card.url is not None
assert card.capabilities is not None
assert card.capabilities.streaming is True
class TestA2APollingIntegration:
"""Integration tests for A2A polling handler."""
@pytest.mark.vcr()
@pytest.mark.asyncio
async def test_polling_completes_task(
self,
a2a_client,
test_message: Message,
agent_card: AgentCard,
) -> None:
"""Test that polling handler completes a task successfully."""
new_messages: list[Message] = []
result = await PollingHandler.execute(
client=a2a_client,
message=test_message,
new_messages=new_messages,
agent_card=agent_card,
polling_interval=0.5,
polling_timeout=30.0,
)
assert isinstance(result, dict)
assert result["status"] == TaskState.completed
assert result.get("result") is not None
assert "4" in result["result"]
class TestA2AStreamingIntegration:
"""Integration tests for A2A streaming handler."""
@pytest.mark.vcr()
@pytest.mark.asyncio
async def test_streaming_completes_task(
self,
a2a_client,
test_message: Message,
agent_card: AgentCard,
) -> None:
"""Test that streaming handler completes a task successfully."""
new_messages: list[Message] = []
result = await StreamingHandler.execute(
client=a2a_client,
message=test_message,
new_messages=new_messages,
agent_card=agent_card,
)
assert isinstance(result, dict)
assert result["status"] == TaskState.completed
assert result.get("result") is not None
class TestA2ATaskOperations:
"""Integration tests for task operations."""
@pytest.mark.vcr()
@pytest.mark.asyncio
async def test_send_message_and_get_response(
self,
a2a_client,
test_message: Message,
) -> None:
"""Test sending a message and getting a response."""
from a2a.types import Task
final_task: Task | None = None
async for event in a2a_client.send_message(test_message):
if isinstance(event, tuple) and len(event) >= 1:
task, _ = event
if isinstance(task, Task):
final_task = task
assert final_task is not None
assert final_task.id is not None
assert final_task.status is not None
assert final_task.status.state == TaskState.completed
class TestA2APushNotificationHandler:
"""Tests for push notification handler.
These tests use mocks for the result store since webhook callbacks
are incoming requests that can't be recorded with VCR.
"""
@pytest.fixture
def mock_agent_card(self) -> AgentCard:
"""Create a minimal valid agent card for testing."""
from a2a.types import AgentCapabilities
return AgentCard(
name="Test Agent",
description="Test agent for push notification tests",
url="http://localhost:9999",
version="1.0.0",
capabilities=AgentCapabilities(streaming=True, push_notifications=True),
default_input_modes=["text"],
default_output_modes=["text"],
skills=[],
)
@pytest.fixture
def mock_task(self) -> "Task":
"""Create a minimal valid task for testing."""
from a2a.types import Task, TaskStatus
return Task(
id="task-123",
context_id="ctx-123",
status=TaskStatus(state=TaskState.working),
)
@pytest.mark.asyncio
async def test_push_handler_waits_for_result(
self,
mock_agent_card: AgentCard,
mock_task,
) -> None:
"""Test that push handler waits for result from store."""
from unittest.mock import AsyncMock, MagicMock
from a2a.types import Task, TaskStatus
from pydantic import AnyHttpUrl
from crewai.a2a.updates.push_notifications.config import PushNotificationConfig
from crewai.a2a.updates.push_notifications.handler import PushNotificationHandler
completed_task = Task(
id="task-123",
context_id="ctx-123",
status=TaskStatus(state=TaskState.completed),
history=[],
)
mock_store = MagicMock()
mock_store.wait_for_result = AsyncMock(return_value=completed_task)
async def mock_send_message(*args, **kwargs):
yield (mock_task, None)
mock_client = MagicMock()
mock_client.send_message = mock_send_message
config = PushNotificationConfig(
url=AnyHttpUrl("http://localhost:8080/a2a/callback"),
token="secret-token",
result_store=mock_store,
)
test_msg = Message(
role=Role.user,
parts=[Part(root=TextPart(text="What is 2+2?"))],
message_id="msg-001",
)
new_messages: list[Message] = []
result = await PushNotificationHandler.execute(
client=mock_client,
message=test_msg,
new_messages=new_messages,
agent_card=mock_agent_card,
config=config,
result_store=mock_store,
polling_timeout=30.0,
polling_interval=1.0,
)
mock_store.wait_for_result.assert_called_once_with(
task_id="task-123",
timeout=30.0,
poll_interval=1.0,
)
assert result["status"] == TaskState.completed
@pytest.mark.asyncio
async def test_push_handler_returns_failure_on_timeout(
self,
mock_agent_card: AgentCard,
) -> None:
"""Test that push handler returns failure when result store times out."""
from unittest.mock import AsyncMock, MagicMock
from a2a.types import Task, TaskStatus
from pydantic import AnyHttpUrl
from crewai.a2a.updates.push_notifications.config import PushNotificationConfig
from crewai.a2a.updates.push_notifications.handler import PushNotificationHandler
mock_store = MagicMock()
mock_store.wait_for_result = AsyncMock(return_value=None)
working_task = Task(
id="task-456",
context_id="ctx-456",
status=TaskStatus(state=TaskState.working),
)
async def mock_send_message(*args, **kwargs):
yield (working_task, None)
mock_client = MagicMock()
mock_client.send_message = mock_send_message
config = PushNotificationConfig(
url=AnyHttpUrl("http://localhost:8080/a2a/callback"),
token="token",
result_store=mock_store,
)
test_msg = Message(
role=Role.user,
parts=[Part(root=TextPart(text="test"))],
message_id="msg-002",
)
new_messages: list[Message] = []
result = await PushNotificationHandler.execute(
client=mock_client,
message=test_msg,
new_messages=new_messages,
agent_card=mock_agent_card,
config=config,
result_store=mock_store,
polling_timeout=5.0,
polling_interval=0.5,
)
assert result["status"] == TaskState.failed
assert "timeout" in result.get("error", "").lower()
@pytest.mark.asyncio
async def test_push_handler_requires_config(
self,
mock_agent_card: AgentCard,
) -> None:
"""Test that push handler fails gracefully without config."""
from unittest.mock import MagicMock
from crewai.a2a.updates.push_notifications.handler import PushNotificationHandler
mock_client = MagicMock()
test_msg = Message(
role=Role.user,
parts=[Part(root=TextPart(text="test"))],
message_id="msg-003",
)
new_messages: list[Message] = []
result = await PushNotificationHandler.execute(
client=mock_client,
message=test_msg,
new_messages=new_messages,
agent_card=mock_agent_card,
)
assert result["status"] == TaskState.failed
assert "config" in result.get("error", "").lower()

View File

@@ -0,0 +1,44 @@
interactions:
- request:
body: ''
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- '*/*'
accept-encoding:
- ACCEPT-ENCODING-XXX
connection:
- keep-alive
host:
- localhost:9999
method: GET
uri: http://localhost:9999/.well-known/agent-card.json
response:
body:
string: '{"capabilities":{"streaming":true},"defaultInputModes":["text"],"defaultOutputModes":["text"],"description":"An
AI assistant powered by OpenAI GPT with calculator and time tools. Ask questions,
perform calculations, or get the current time in any timezone.","name":"GPT
Assistant","preferredTransport":"JSONRPC","protocolVersion":"0.3.0","skills":[{"description":"Have
a general conversation with the AI assistant. Ask questions, get explanations,
or just chat.","examples":["Hello, how are you?","Explain quantum computing
in simple terms","What can you help me with?"],"id":"conversation","name":"General
Conversation","tags":["chat","conversation","general"]},{"description":"Perform
mathematical calculations including arithmetic, exponents, and more.","examples":["What
is 25 * 17?","Calculate 2^10","What''s (100 + 50) / 3?"],"id":"calculator","name":"Calculator","tags":["math","calculator","arithmetic"]},{"description":"Get
the current date and time in any timezone.","examples":["What time is it?","What''s
the current time in Tokyo?","What''s today''s date in New York?"],"id":"time","name":"Current
Time","tags":["time","date","timezone"]}],"url":"http://localhost:9999/","version":"1.0.0"}'
headers:
content-length:
- '1198'
content-type:
- application/json
date:
- Tue, 06 Jan 2026 14:17:00 GMT
server:
- uvicorn
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,126 @@
interactions:
- request:
body: ''
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- '*/*'
accept-encoding:
- ACCEPT-ENCODING-XXX
connection:
- keep-alive
host:
- localhost:9999
method: GET
uri: http://localhost:9999/.well-known/agent-card.json
response:
body:
string: '{"capabilities":{"streaming":true},"defaultInputModes":["text"],"defaultOutputModes":["text"],"description":"An
AI assistant powered by OpenAI GPT with calculator and time tools. Ask questions,
perform calculations, or get the current time in any timezone.","name":"GPT
Assistant","preferredTransport":"JSONRPC","protocolVersion":"0.3.0","skills":[{"description":"Have
a general conversation with the AI assistant. Ask questions, get explanations,
or just chat.","examples":["Hello, how are you?","Explain quantum computing
in simple terms","What can you help me with?"],"id":"conversation","name":"General
Conversation","tags":["chat","conversation","general"]},{"description":"Perform
mathematical calculations including arithmetic, exponents, and more.","examples":["What
is 25 * 17?","Calculate 2^10","What''s (100 + 50) / 3?"],"id":"calculator","name":"Calculator","tags":["math","calculator","arithmetic"]},{"description":"Get
the current date and time in any timezone.","examples":["What time is it?","What''s
the current time in Tokyo?","What''s today''s date in New York?"],"id":"time","name":"Current
Time","tags":["time","date","timezone"]}],"url":"http://localhost:9999/","version":"1.0.0"}'
headers:
content-length:
- '1198'
content-type:
- application/json
date:
- Tue, 06 Jan 2026 14:16:58 GMT
server:
- uvicorn
status:
code: 200
message: OK
- request:
body: '{"id":"e5ac2160-ae9b-4bf9-aad7-14bf0d53d6d9","jsonrpc":"2.0","method":"message/stream","params":{"configuration":{"acceptedOutputModes":[],"blocking":true},"message":{"kind":"message","messageId":"e1e63c75-3ea0-49fb-b512-5128a2476416","parts":[{"kind":"text","text":"What
is 2 + 2?"}],"role":"user"}}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- '*/*, text/event-stream'
accept-encoding:
- ACCEPT-ENCODING-XXX
cache-control:
- no-store
connection:
- keep-alive
content-length:
- '301'
content-type:
- application/json
host:
- localhost:9999
method: POST
uri: http://localhost:9999/
response:
body:
string: "data: {\"id\":\"e5ac2160-ae9b-4bf9-aad7-14bf0d53d6d9\",\"jsonrpc\":\"2.0\",\"result\":{\"contextId\":\"b9e14c1b-734d-4d1e-864a-e6dda5231d71\",\"final\":false,\"kind\":\"status-update\",\"status\":{\"state\":\"submitted\"},\"taskId\":\"0dd4d3af-f35d-409d-9462-01218e5641f9\"}}\r\n\r\ndata:
{\"id\":\"e5ac2160-ae9b-4bf9-aad7-14bf0d53d6d9\",\"jsonrpc\":\"2.0\",\"result\":{\"contextId\":\"b9e14c1b-734d-4d1e-864a-e6dda5231d71\",\"final\":false,\"kind\":\"status-update\",\"status\":{\"state\":\"working\"},\"taskId\":\"0dd4d3af-f35d-409d-9462-01218e5641f9\"}}\r\n\r\ndata:
{\"id\":\"e5ac2160-ae9b-4bf9-aad7-14bf0d53d6d9\",\"jsonrpc\":\"2.0\",\"result\":{\"contextId\":\"b9e14c1b-734d-4d1e-864a-e6dda5231d71\",\"final\":true,\"kind\":\"status-update\",\"status\":{\"message\":{\"kind\":\"message\",\"messageId\":\"54bb7ff3-f2c0-4eb3-b427-bf1c8cf90832\",\"parts\":[{\"kind\":\"text\",\"text\":\"\\n[Tool:
calculator] 2 + 2 = 4\\n2 + 2 equals 4.\"}],\"role\":\"agent\"},\"state\":\"completed\"},\"taskId\":\"0dd4d3af-f35d-409d-9462-01218e5641f9\"}}\r\n\r\n"
headers:
Transfer-Encoding:
- chunked
cache-control:
- no-store
connection:
- keep-alive
content-type:
- text/event-stream; charset=utf-8
date:
- Tue, 06 Jan 2026 14:16:58 GMT
server:
- uvicorn
x-accel-buffering:
- 'no'
status:
code: 200
message: OK
- request:
body: '{"id":"cb1e4af3-d2d0-4848-96b8-7082ee6171d1","jsonrpc":"2.0","method":"tasks/get","params":{"historyLength":100,"id":"0dd4d3af-f35d-409d-9462-01218e5641f9"}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- '*/*'
accept-encoding:
- ACCEPT-ENCODING-XXX
connection:
- keep-alive
content-length:
- '157'
content-type:
- application/json
host:
- localhost:9999
method: POST
uri: http://localhost:9999/
response:
body:
string: '{"id":"cb1e4af3-d2d0-4848-96b8-7082ee6171d1","jsonrpc":"2.0","result":{"contextId":"b9e14c1b-734d-4d1e-864a-e6dda5231d71","history":[{"contextId":"b9e14c1b-734d-4d1e-864a-e6dda5231d71","kind":"message","messageId":"e1e63c75-3ea0-49fb-b512-5128a2476416","parts":[{"kind":"text","text":"What
is 2 + 2?"}],"role":"user","taskId":"0dd4d3af-f35d-409d-9462-01218e5641f9"}],"id":"0dd4d3af-f35d-409d-9462-01218e5641f9","kind":"task","status":{"message":{"kind":"message","messageId":"54bb7ff3-f2c0-4eb3-b427-bf1c8cf90832","parts":[{"kind":"text","text":"\n[Tool:
calculator] 2 + 2 = 4\n2 + 2 equals 4."}],"role":"agent"},"state":"completed"}}}'
headers:
content-length:
- '635'
content-type:
- application/json
date:
- Tue, 06 Jan 2026 14:17:00 GMT
server:
- uvicorn
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,90 @@
interactions:
- request:
body: ''
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- '*/*'
accept-encoding:
- ACCEPT-ENCODING-XXX
connection:
- keep-alive
host:
- localhost:9999
method: GET
uri: http://localhost:9999/.well-known/agent-card.json
response:
body:
string: '{"capabilities":{"streaming":true},"defaultInputModes":["text"],"defaultOutputModes":["text"],"description":"An
AI assistant powered by OpenAI GPT with calculator and time tools. Ask questions,
perform calculations, or get the current time in any timezone.","name":"GPT
Assistant","preferredTransport":"JSONRPC","protocolVersion":"0.3.0","skills":[{"description":"Have
a general conversation with the AI assistant. Ask questions, get explanations,
or just chat.","examples":["Hello, how are you?","Explain quantum computing
in simple terms","What can you help me with?"],"id":"conversation","name":"General
Conversation","tags":["chat","conversation","general"]},{"description":"Perform
mathematical calculations including arithmetic, exponents, and more.","examples":["What
is 25 * 17?","Calculate 2^10","What''s (100 + 50) / 3?"],"id":"calculator","name":"Calculator","tags":["math","calculator","arithmetic"]},{"description":"Get
the current date and time in any timezone.","examples":["What time is it?","What''s
the current time in Tokyo?","What''s today''s date in New York?"],"id":"time","name":"Current
Time","tags":["time","date","timezone"]}],"url":"http://localhost:9999/","version":"1.0.0"}'
headers:
content-length:
- '1198'
content-type:
- application/json
date:
- Tue, 06 Jan 2026 14:17:02 GMT
server:
- uvicorn
status:
code: 200
message: OK
- request:
body: '{"id":"8cf25b61-8884-4246-adce-fccb32e176ab","jsonrpc":"2.0","method":"message/stream","params":{"configuration":{"acceptedOutputModes":[],"blocking":true},"message":{"kind":"message","messageId":"c145297f-7331-4835-adcc-66b51de92a2b","parts":[{"kind":"text","text":"What
is 2 + 2?"}],"role":"user"}}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- '*/*, text/event-stream'
accept-encoding:
- ACCEPT-ENCODING-XXX
cache-control:
- no-store
connection:
- keep-alive
content-length:
- '301'
content-type:
- application/json
host:
- localhost:9999
method: POST
uri: http://localhost:9999/
response:
body:
string: "data: {\"id\":\"8cf25b61-8884-4246-adce-fccb32e176ab\",\"jsonrpc\":\"2.0\",\"result\":{\"contextId\":\"30601267-ab3b-48ef-afc8-916c37a18651\",\"final\":false,\"kind\":\"status-update\",\"status\":{\"state\":\"submitted\"},\"taskId\":\"3083d3da-4739-4f4f-a4e8-7c048ea819c1\"}}\r\n\r\ndata:
{\"id\":\"8cf25b61-8884-4246-adce-fccb32e176ab\",\"jsonrpc\":\"2.0\",\"result\":{\"contextId\":\"30601267-ab3b-48ef-afc8-916c37a18651\",\"final\":false,\"kind\":\"status-update\",\"status\":{\"state\":\"working\"},\"taskId\":\"3083d3da-4739-4f4f-a4e8-7c048ea819c1\"}}\r\n\r\ndata:
{\"id\":\"8cf25b61-8884-4246-adce-fccb32e176ab\",\"jsonrpc\":\"2.0\",\"result\":{\"contextId\":\"30601267-ab3b-48ef-afc8-916c37a18651\",\"final\":true,\"kind\":\"status-update\",\"status\":{\"message\":{\"kind\":\"message\",\"messageId\":\"25f81e3c-b7e8-48b5-a98a-4066f3637a13\",\"parts\":[{\"kind\":\"text\",\"text\":\"\\n[Tool:
calculator] 2 + 2 = 4\\n2 + 2 equals 4.\"}],\"role\":\"agent\"},\"state\":\"completed\"},\"taskId\":\"3083d3da-4739-4f4f-a4e8-7c048ea819c1\"}}\r\n\r\n"
headers:
Transfer-Encoding:
- chunked
cache-control:
- no-store
connection:
- keep-alive
content-type:
- text/event-stream; charset=utf-8
date:
- Tue, 06 Jan 2026 14:17:02 GMT
server:
- uvicorn
x-accel-buffering:
- 'no'
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,90 @@
interactions:
- request:
body: ''
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- '*/*'
accept-encoding:
- ACCEPT-ENCODING-XXX
connection:
- keep-alive
host:
- localhost:9999
method: GET
uri: http://localhost:9999/.well-known/agent-card.json
response:
body:
string: '{"capabilities":{"streaming":true},"defaultInputModes":["text"],"defaultOutputModes":["text"],"description":"An
AI assistant powered by OpenAI GPT with calculator and time tools. Ask questions,
perform calculations, or get the current time in any timezone.","name":"GPT
Assistant","preferredTransport":"JSONRPC","protocolVersion":"0.3.0","skills":[{"description":"Have
a general conversation with the AI assistant. Ask questions, get explanations,
or just chat.","examples":["Hello, how are you?","Explain quantum computing
in simple terms","What can you help me with?"],"id":"conversation","name":"General
Conversation","tags":["chat","conversation","general"]},{"description":"Perform
mathematical calculations including arithmetic, exponents, and more.","examples":["What
is 25 * 17?","Calculate 2^10","What''s (100 + 50) / 3?"],"id":"calculator","name":"Calculator","tags":["math","calculator","arithmetic"]},{"description":"Get
the current date and time in any timezone.","examples":["What time is it?","What''s
the current time in Tokyo?","What''s today''s date in New York?"],"id":"time","name":"Current
Time","tags":["time","date","timezone"]}],"url":"http://localhost:9999/","version":"1.0.0"}'
headers:
content-length:
- '1198'
content-type:
- application/json
date:
- Tue, 06 Jan 2026 14:17:00 GMT
server:
- uvicorn
status:
code: 200
message: OK
- request:
body: '{"id":"3a17c6bf-8db6-45a6-8535-34c45c0c4936","jsonrpc":"2.0","method":"message/stream","params":{"configuration":{"acceptedOutputModes":[],"blocking":true},"message":{"kind":"message","messageId":"712558a3-6d92-4591-be8a-9dd8566dde82","parts":[{"kind":"text","text":"What
is 2 + 2?"}],"role":"user"}}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- '*/*, text/event-stream'
accept-encoding:
- ACCEPT-ENCODING-XXX
cache-control:
- no-store
connection:
- keep-alive
content-length:
- '301'
content-type:
- application/json
host:
- localhost:9999
method: POST
uri: http://localhost:9999/
response:
body:
string: "data: {\"id\":\"3a17c6bf-8db6-45a6-8535-34c45c0c4936\",\"jsonrpc\":\"2.0\",\"result\":{\"contextId\":\"ca2fbbc9-761e-45d9-a929-0c68b1f8acbf\",\"final\":false,\"kind\":\"status-update\",\"status\":{\"state\":\"submitted\"},\"taskId\":\"c6e88db0-36e9-4269-8b9a-ecb6dfdcf6a1\"}}\r\n\r\ndata:
{\"id\":\"3a17c6bf-8db6-45a6-8535-34c45c0c4936\",\"jsonrpc\":\"2.0\",\"result\":{\"contextId\":\"ca2fbbc9-761e-45d9-a929-0c68b1f8acbf\",\"final\":false,\"kind\":\"status-update\",\"status\":{\"state\":\"working\"},\"taskId\":\"c6e88db0-36e9-4269-8b9a-ecb6dfdcf6a1\"}}\r\n\r\ndata:
{\"id\":\"3a17c6bf-8db6-45a6-8535-34c45c0c4936\",\"jsonrpc\":\"2.0\",\"result\":{\"contextId\":\"ca2fbbc9-761e-45d9-a929-0c68b1f8acbf\",\"final\":true,\"kind\":\"status-update\",\"status\":{\"message\":{\"kind\":\"message\",\"messageId\":\"916324aa-fd25-4849-bceb-c4644e2fcbb0\",\"parts\":[{\"kind\":\"text\",\"text\":\"\\n[Tool:
calculator] 2 + 2 = 4\\n2 + 2 equals 4.\"}],\"role\":\"agent\"},\"state\":\"completed\"},\"taskId\":\"c6e88db0-36e9-4269-8b9a-ecb6dfdcf6a1\"}}\r\n\r\n"
headers:
Transfer-Encoding:
- chunked
cache-control:
- no-store
connection:
- keep-alive
content-type:
- text/event-stream; charset=utf-8
date:
- Tue, 06 Jan 2026 14:17:00 GMT
server:
- uvicorn
x-accel-buffering:
- 'no'
status:
code: 200
message: OK
version: 1