mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-17 03:58:33 +00:00
Compare commits
1 Commits
devin/1768
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ceef062426 |
@@ -3,9 +3,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import AsyncIterator
|
||||
from typing import TYPE_CHECKING, TypedDict
|
||||
from typing import TYPE_CHECKING, Any, TypedDict
|
||||
import uuid
|
||||
|
||||
from a2a.client.errors import A2AClientHTTPError
|
||||
from a2a.types import (
|
||||
AgentCard,
|
||||
Message,
|
||||
@@ -20,7 +21,10 @@ from a2a.types import (
|
||||
from typing_extensions import NotRequired
|
||||
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.a2a_events import A2AResponseReceivedEvent
|
||||
from crewai.events.types.a2a_events import (
|
||||
A2AConnectionErrorEvent,
|
||||
A2AResponseReceivedEvent,
|
||||
)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -55,7 +59,8 @@ class TaskStateResult(TypedDict):
|
||||
history: list[Message]
|
||||
result: NotRequired[str]
|
||||
error: NotRequired[str]
|
||||
agent_card: NotRequired[AgentCard]
|
||||
agent_card: NotRequired[dict[str, Any]]
|
||||
a2a_agent_name: NotRequired[str | None]
|
||||
|
||||
|
||||
def extract_task_result_parts(a2a_task: A2ATask) -> list[str]:
|
||||
@@ -131,50 +136,69 @@ def process_task_state(
|
||||
is_multiturn: bool,
|
||||
agent_role: str | None,
|
||||
result_parts: list[str] | None = None,
|
||||
endpoint: str | None = None,
|
||||
a2a_agent_name: str | None = None,
|
||||
from_task: Any | None = None,
|
||||
from_agent: Any | None = None,
|
||||
is_final: bool = True,
|
||||
) -> TaskStateResult | None:
|
||||
"""Process A2A task state and return result dictionary.
|
||||
|
||||
Shared logic for both polling and streaming handlers.
|
||||
|
||||
Args:
|
||||
a2a_task: The A2A task to process
|
||||
new_messages: List to collect messages (modified in place)
|
||||
agent_card: The agent card
|
||||
turn_number: Current turn number
|
||||
is_multiturn: Whether multi-turn conversation
|
||||
agent_role: Agent role for logging
|
||||
a2a_task: The A2A task to process.
|
||||
new_messages: List to collect messages (modified in place).
|
||||
agent_card: The agent card.
|
||||
turn_number: Current turn number.
|
||||
is_multiturn: Whether multi-turn conversation.
|
||||
agent_role: Agent role for logging.
|
||||
result_parts: Accumulated result parts (streaming passes accumulated,
|
||||
polling passes None to extract from task)
|
||||
polling passes None to extract from task).
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
from_task: Optional CrewAI Task for event metadata.
|
||||
from_agent: Optional CrewAI Agent for event metadata.
|
||||
is_final: Whether this is the final response in the stream.
|
||||
|
||||
Returns:
|
||||
Result dictionary if terminal/actionable state, None otherwise
|
||||
Result dictionary if terminal/actionable state, None otherwise.
|
||||
"""
|
||||
should_extract = result_parts is None
|
||||
if result_parts is None:
|
||||
result_parts = []
|
||||
|
||||
if a2a_task.status.state == TaskState.completed:
|
||||
if should_extract:
|
||||
if not result_parts:
|
||||
extracted_parts = extract_task_result_parts(a2a_task)
|
||||
result_parts.extend(extracted_parts)
|
||||
if a2a_task.history:
|
||||
new_messages.extend(a2a_task.history)
|
||||
|
||||
response_text = " ".join(result_parts) if result_parts else ""
|
||||
message_id = None
|
||||
if a2a_task.status and a2a_task.status.message:
|
||||
message_id = a2a_task.status.message.message_id
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AResponseReceivedEvent(
|
||||
response=response_text,
|
||||
turn_number=turn_number,
|
||||
context_id=a2a_task.context_id,
|
||||
message_id=message_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="completed",
|
||||
final=is_final,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
return TaskStateResult(
|
||||
status=TaskState.completed,
|
||||
agent_card=agent_card,
|
||||
agent_card=agent_card.model_dump(exclude_none=True),
|
||||
result=response_text,
|
||||
history=new_messages,
|
||||
)
|
||||
@@ -194,14 +218,24 @@ def process_task_state(
|
||||
)
|
||||
new_messages.append(agent_message)
|
||||
|
||||
input_message_id = None
|
||||
if a2a_task.status and a2a_task.status.message:
|
||||
input_message_id = a2a_task.status.message.message_id
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AResponseReceivedEvent(
|
||||
response=response_text,
|
||||
turn_number=turn_number,
|
||||
context_id=a2a_task.context_id,
|
||||
message_id=input_message_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="input_required",
|
||||
final=is_final,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -209,7 +243,7 @@ def process_task_state(
|
||||
status=TaskState.input_required,
|
||||
error=response_text,
|
||||
history=new_messages,
|
||||
agent_card=agent_card,
|
||||
agent_card=agent_card.model_dump(exclude_none=True),
|
||||
)
|
||||
|
||||
if a2a_task.status.state in {TaskState.failed, TaskState.rejected}:
|
||||
@@ -248,6 +282,11 @@ async def send_message_and_get_task_id(
|
||||
turn_number: int,
|
||||
is_multiturn: bool,
|
||||
agent_role: str | None,
|
||||
from_task: Any | None = None,
|
||||
from_agent: Any | None = None,
|
||||
endpoint: str | None = None,
|
||||
a2a_agent_name: str | None = None,
|
||||
context_id: str | None = None,
|
||||
) -> str | TaskStateResult:
|
||||
"""Send message and process initial response.
|
||||
|
||||
@@ -262,6 +301,11 @@ async def send_message_and_get_task_id(
|
||||
turn_number: Current turn number
|
||||
is_multiturn: Whether multi-turn conversation
|
||||
agent_role: Agent role for logging
|
||||
from_task: Optional CrewAI Task object for event metadata.
|
||||
from_agent: Optional CrewAI Agent object for event metadata.
|
||||
endpoint: Optional A2A endpoint URL.
|
||||
a2a_agent_name: Optional A2A agent name.
|
||||
context_id: Optional A2A context ID for correlation.
|
||||
|
||||
Returns:
|
||||
Task ID string if agent needs polling/waiting, or TaskStateResult if done.
|
||||
@@ -280,9 +324,16 @@ async def send_message_and_get_task_id(
|
||||
A2AResponseReceivedEvent(
|
||||
response=response_text,
|
||||
turn_number=turn_number,
|
||||
context_id=event.context_id,
|
||||
message_id=event.message_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="completed",
|
||||
final=True,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -290,7 +341,7 @@ async def send_message_and_get_task_id(
|
||||
status=TaskState.completed,
|
||||
result=response_text,
|
||||
history=new_messages,
|
||||
agent_card=agent_card,
|
||||
agent_card=agent_card.model_dump(exclude_none=True),
|
||||
)
|
||||
|
||||
if isinstance(event, tuple):
|
||||
@@ -304,6 +355,10 @@ async def send_message_and_get_task_id(
|
||||
turn_number=turn_number,
|
||||
is_multiturn=is_multiturn,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
)
|
||||
if result:
|
||||
return result
|
||||
@@ -316,6 +371,99 @@ async def send_message_and_get_task_id(
|
||||
history=new_messages,
|
||||
)
|
||||
|
||||
except A2AClientHTTPError as e:
|
||||
error_msg = f"HTTP Error {e.status_code}: {e!s}"
|
||||
|
||||
error_message = Message(
|
||||
role=Role.agent,
|
||||
message_id=str(uuid.uuid4()),
|
||||
parts=[Part(root=TextPart(text=error_msg))],
|
||||
context_id=context_id,
|
||||
)
|
||||
new_messages.append(error_message)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint or "",
|
||||
error=str(e),
|
||||
error_type="http_error",
|
||||
status_code=e.status_code,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
operation="send_message",
|
||||
context_id=context_id,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AResponseReceivedEvent(
|
||||
response=error_msg,
|
||||
turn_number=turn_number,
|
||||
context_id=context_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="failed",
|
||||
final=True,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
return TaskStateResult(
|
||||
status=TaskState.failed,
|
||||
error=error_msg,
|
||||
history=new_messages,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error during send_message: {e!s}"
|
||||
|
||||
error_message = Message(
|
||||
role=Role.agent,
|
||||
message_id=str(uuid.uuid4()),
|
||||
parts=[Part(root=TextPart(text=error_msg))],
|
||||
context_id=context_id,
|
||||
)
|
||||
new_messages.append(error_message)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint or "",
|
||||
error=str(e),
|
||||
error_type="unexpected_error",
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
operation="send_message",
|
||||
context_id=context_id,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AResponseReceivedEvent(
|
||||
response=error_msg,
|
||||
turn_number=turn_number,
|
||||
context_id=context_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="failed",
|
||||
final=True,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
return TaskStateResult(
|
||||
status=TaskState.failed,
|
||||
error=error_msg,
|
||||
history=new_messages,
|
||||
)
|
||||
|
||||
finally:
|
||||
aclose = getattr(event_stream, "aclose", None)
|
||||
if aclose:
|
||||
|
||||
@@ -22,6 +22,13 @@ class BaseHandlerKwargs(TypedDict, total=False):
|
||||
turn_number: int
|
||||
is_multiturn: bool
|
||||
agent_role: str | None
|
||||
context_id: str | None
|
||||
task_id: str | None
|
||||
endpoint: str | None
|
||||
agent_branch: Any
|
||||
a2a_agent_name: str | None
|
||||
from_task: Any
|
||||
from_agent: Any
|
||||
|
||||
|
||||
class PollingHandlerKwargs(BaseHandlerKwargs, total=False):
|
||||
@@ -29,8 +36,6 @@ class PollingHandlerKwargs(BaseHandlerKwargs, total=False):
|
||||
|
||||
polling_interval: float
|
||||
polling_timeout: float
|
||||
endpoint: str
|
||||
agent_branch: Any
|
||||
history_length: int
|
||||
max_polls: int | None
|
||||
|
||||
@@ -38,9 +43,6 @@ class PollingHandlerKwargs(BaseHandlerKwargs, total=False):
|
||||
class StreamingHandlerKwargs(BaseHandlerKwargs, total=False):
|
||||
"""Kwargs for streaming handler."""
|
||||
|
||||
context_id: str | None
|
||||
task_id: str | None
|
||||
|
||||
|
||||
class PushNotificationHandlerKwargs(BaseHandlerKwargs, total=False):
|
||||
"""Kwargs for push notification handler."""
|
||||
@@ -49,7 +51,6 @@ class PushNotificationHandlerKwargs(BaseHandlerKwargs, total=False):
|
||||
result_store: PushNotificationResultStore
|
||||
polling_timeout: float
|
||||
polling_interval: float
|
||||
agent_branch: Any
|
||||
|
||||
|
||||
class PushNotificationResultStore(Protocol):
|
||||
|
||||
@@ -31,6 +31,7 @@ from crewai.a2a.task_helpers import (
|
||||
from crewai.a2a.updates.base import PollingHandlerKwargs
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.a2a_events import (
|
||||
A2AConnectionErrorEvent,
|
||||
A2APollingStartedEvent,
|
||||
A2APollingStatusEvent,
|
||||
A2AResponseReceivedEvent,
|
||||
@@ -49,23 +50,33 @@ async def _poll_task_until_complete(
|
||||
agent_branch: Any | None = None,
|
||||
history_length: int = 100,
|
||||
max_polls: int | None = None,
|
||||
from_task: Any | None = None,
|
||||
from_agent: Any | None = None,
|
||||
context_id: str | None = None,
|
||||
endpoint: str | None = None,
|
||||
a2a_agent_name: str | None = None,
|
||||
) -> A2ATask:
|
||||
"""Poll task status until terminal state reached.
|
||||
|
||||
Args:
|
||||
client: A2A client instance
|
||||
task_id: Task ID to poll
|
||||
polling_interval: Seconds between poll attempts
|
||||
polling_timeout: Max seconds before timeout
|
||||
agent_branch: Agent tree branch for logging
|
||||
history_length: Number of messages to retrieve per poll
|
||||
max_polls: Max number of poll attempts (None = unlimited)
|
||||
client: A2A client instance.
|
||||
task_id: Task ID to poll.
|
||||
polling_interval: Seconds between poll attempts.
|
||||
polling_timeout: Max seconds before timeout.
|
||||
agent_branch: Agent tree branch for logging.
|
||||
history_length: Number of messages to retrieve per poll.
|
||||
max_polls: Max number of poll attempts (None = unlimited).
|
||||
from_task: Optional CrewAI Task object for event metadata.
|
||||
from_agent: Optional CrewAI Agent object for event metadata.
|
||||
context_id: A2A context ID for correlation.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
|
||||
Returns:
|
||||
Final task object in terminal state
|
||||
Final task object in terminal state.
|
||||
|
||||
Raises:
|
||||
A2APollingTimeoutError: If polling exceeds timeout or max_polls
|
||||
A2APollingTimeoutError: If polling exceeds timeout or max_polls.
|
||||
"""
|
||||
start_time = time.monotonic()
|
||||
poll_count = 0
|
||||
@@ -77,13 +88,19 @@ async def _poll_task_until_complete(
|
||||
)
|
||||
|
||||
elapsed = time.monotonic() - start_time
|
||||
effective_context_id = task.context_id or context_id
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2APollingStatusEvent(
|
||||
task_id=task_id,
|
||||
context_id=effective_context_id,
|
||||
state=str(task.status.state.value) if task.status.state else "unknown",
|
||||
elapsed_seconds=elapsed,
|
||||
poll_count=poll_count,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -137,6 +154,9 @@ class PollingHandler:
|
||||
max_polls = kwargs.get("max_polls")
|
||||
context_id = kwargs.get("context_id")
|
||||
task_id = kwargs.get("task_id")
|
||||
a2a_agent_name = kwargs.get("a2a_agent_name")
|
||||
from_task = kwargs.get("from_task")
|
||||
from_agent = kwargs.get("from_agent")
|
||||
|
||||
try:
|
||||
result_or_task_id = await send_message_and_get_task_id(
|
||||
@@ -146,6 +166,11 @@ class PollingHandler:
|
||||
turn_number=turn_number,
|
||||
is_multiturn=is_multiturn,
|
||||
agent_role=agent_role,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
context_id=context_id,
|
||||
)
|
||||
|
||||
if not isinstance(result_or_task_id, str):
|
||||
@@ -157,8 +182,12 @@ class PollingHandler:
|
||||
agent_branch,
|
||||
A2APollingStartedEvent(
|
||||
task_id=task_id,
|
||||
context_id=context_id,
|
||||
polling_interval=polling_interval,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -170,6 +199,11 @@ class PollingHandler:
|
||||
agent_branch=agent_branch,
|
||||
history_length=history_length,
|
||||
max_polls=max_polls,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
context_id=context_id,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
)
|
||||
|
||||
result = process_task_state(
|
||||
@@ -179,6 +213,10 @@ class PollingHandler:
|
||||
turn_number=turn_number,
|
||||
is_multiturn=is_multiturn,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
)
|
||||
if result:
|
||||
return result
|
||||
@@ -206,9 +244,15 @@ class PollingHandler:
|
||||
A2AResponseReceivedEvent(
|
||||
response=error_msg,
|
||||
turn_number=turn_number,
|
||||
context_id=context_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="failed",
|
||||
final=True,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
return TaskStateResult(
|
||||
@@ -229,14 +273,83 @@ class PollingHandler:
|
||||
)
|
||||
new_messages.append(error_message)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint,
|
||||
error=str(e),
|
||||
error_type="http_error",
|
||||
status_code=e.status_code,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
operation="polling",
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AResponseReceivedEvent(
|
||||
response=error_msg,
|
||||
turn_number=turn_number,
|
||||
context_id=context_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="failed",
|
||||
final=True,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
return TaskStateResult(
|
||||
status=TaskState.failed,
|
||||
error=error_msg,
|
||||
history=new_messages,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error during polling: {e!s}"
|
||||
|
||||
error_message = Message(
|
||||
role=Role.agent,
|
||||
message_id=str(uuid.uuid4()),
|
||||
parts=[Part(root=TextPart(text=error_msg))],
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
)
|
||||
new_messages.append(error_message)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint or "",
|
||||
error=str(e),
|
||||
error_type="unexpected_error",
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
operation="polling",
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AResponseReceivedEvent(
|
||||
response=error_msg,
|
||||
turn_number=turn_number,
|
||||
context_id=context_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="failed",
|
||||
final=True,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
return TaskStateResult(
|
||||
|
||||
@@ -29,6 +29,7 @@ from crewai.a2a.updates.base import (
|
||||
)
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.a2a_events import (
|
||||
A2AConnectionErrorEvent,
|
||||
A2APushNotificationRegisteredEvent,
|
||||
A2APushNotificationTimeoutEvent,
|
||||
A2AResponseReceivedEvent,
|
||||
@@ -48,6 +49,11 @@ async def _wait_for_push_result(
|
||||
timeout: float,
|
||||
poll_interval: float,
|
||||
agent_branch: Any | None = None,
|
||||
from_task: Any | None = None,
|
||||
from_agent: Any | None = None,
|
||||
context_id: str | None = None,
|
||||
endpoint: str | None = None,
|
||||
a2a_agent_name: str | None = None,
|
||||
) -> A2ATask | None:
|
||||
"""Wait for push notification result.
|
||||
|
||||
@@ -57,6 +63,11 @@ async def _wait_for_push_result(
|
||||
timeout: Max seconds to wait.
|
||||
poll_interval: Seconds between polling attempts.
|
||||
agent_branch: Agent tree branch for logging.
|
||||
from_task: Optional CrewAI Task object for event metadata.
|
||||
from_agent: Optional CrewAI Agent object for event metadata.
|
||||
context_id: A2A context ID for correlation.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent.
|
||||
|
||||
Returns:
|
||||
Final task object, or None if timeout.
|
||||
@@ -72,7 +83,12 @@ async def _wait_for_push_result(
|
||||
agent_branch,
|
||||
A2APushNotificationTimeoutEvent(
|
||||
task_id=task_id,
|
||||
context_id=context_id,
|
||||
timeout_seconds=timeout,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -115,18 +131,56 @@ class PushNotificationHandler:
|
||||
agent_role = kwargs.get("agent_role")
|
||||
context_id = kwargs.get("context_id")
|
||||
task_id = kwargs.get("task_id")
|
||||
endpoint = kwargs.get("endpoint")
|
||||
a2a_agent_name = kwargs.get("a2a_agent_name")
|
||||
from_task = kwargs.get("from_task")
|
||||
from_agent = kwargs.get("from_agent")
|
||||
|
||||
if config is None:
|
||||
error_msg = (
|
||||
"PushNotificationConfig is required for push notification handler"
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint or "",
|
||||
error=error_msg,
|
||||
error_type="configuration_error",
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
operation="push_notification",
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
return TaskStateResult(
|
||||
status=TaskState.failed,
|
||||
error="PushNotificationConfig is required for push notification handler",
|
||||
error=error_msg,
|
||||
history=new_messages,
|
||||
)
|
||||
|
||||
if result_store is None:
|
||||
error_msg = (
|
||||
"PushNotificationResultStore is required for push notification handler"
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint or "",
|
||||
error=error_msg,
|
||||
error_type="configuration_error",
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
operation="push_notification",
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
return TaskStateResult(
|
||||
status=TaskState.failed,
|
||||
error="PushNotificationResultStore is required for push notification handler",
|
||||
error=error_msg,
|
||||
history=new_messages,
|
||||
)
|
||||
|
||||
@@ -138,6 +192,11 @@ class PushNotificationHandler:
|
||||
turn_number=turn_number,
|
||||
is_multiturn=is_multiturn,
|
||||
agent_role=agent_role,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
context_id=context_id,
|
||||
)
|
||||
|
||||
if not isinstance(result_or_task_id, str):
|
||||
@@ -149,7 +208,12 @@ class PushNotificationHandler:
|
||||
agent_branch,
|
||||
A2APushNotificationRegisteredEvent(
|
||||
task_id=task_id,
|
||||
context_id=context_id,
|
||||
callback_url=str(config.url),
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -165,6 +229,11 @@ class PushNotificationHandler:
|
||||
timeout=polling_timeout,
|
||||
poll_interval=polling_interval,
|
||||
agent_branch=agent_branch,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
context_id=context_id,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
)
|
||||
|
||||
if final_task is None:
|
||||
@@ -181,6 +250,10 @@ class PushNotificationHandler:
|
||||
turn_number=turn_number,
|
||||
is_multiturn=is_multiturn,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
)
|
||||
if result:
|
||||
return result
|
||||
@@ -203,14 +276,83 @@ class PushNotificationHandler:
|
||||
)
|
||||
new_messages.append(error_message)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint or "",
|
||||
error=str(e),
|
||||
error_type="http_error",
|
||||
status_code=e.status_code,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
operation="push_notification",
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AResponseReceivedEvent(
|
||||
response=error_msg,
|
||||
turn_number=turn_number,
|
||||
context_id=context_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="failed",
|
||||
final=True,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
return TaskStateResult(
|
||||
status=TaskState.failed,
|
||||
error=error_msg,
|
||||
history=new_messages,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error during push notification: {e!s}"
|
||||
|
||||
error_message = Message(
|
||||
role=Role.agent,
|
||||
message_id=str(uuid.uuid4()),
|
||||
parts=[Part(root=TextPart(text=error_msg))],
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
)
|
||||
new_messages.append(error_message)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint or "",
|
||||
error=str(e),
|
||||
error_type="unexpected_error",
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
operation="push_notification",
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AResponseReceivedEvent(
|
||||
response=error_msg,
|
||||
turn_number=turn_number,
|
||||
context_id=context_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="failed",
|
||||
final=True,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
return TaskStateResult(
|
||||
|
||||
@@ -26,7 +26,13 @@ from crewai.a2a.task_helpers import (
|
||||
)
|
||||
from crewai.a2a.updates.base import StreamingHandlerKwargs
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.a2a_events import A2AResponseReceivedEvent
|
||||
from crewai.events.types.a2a_events import (
|
||||
A2AArtifactReceivedEvent,
|
||||
A2AConnectionErrorEvent,
|
||||
A2AResponseReceivedEvent,
|
||||
A2AStreamingChunkEvent,
|
||||
A2AStreamingStartedEvent,
|
||||
)
|
||||
|
||||
|
||||
class StreamingHandler:
|
||||
@@ -57,19 +63,57 @@ class StreamingHandler:
|
||||
turn_number = kwargs.get("turn_number", 0)
|
||||
is_multiturn = kwargs.get("is_multiturn", False)
|
||||
agent_role = kwargs.get("agent_role")
|
||||
endpoint = kwargs.get("endpoint")
|
||||
a2a_agent_name = kwargs.get("a2a_agent_name")
|
||||
from_task = kwargs.get("from_task")
|
||||
from_agent = kwargs.get("from_agent")
|
||||
agent_branch = kwargs.get("agent_branch")
|
||||
|
||||
result_parts: list[str] = []
|
||||
final_result: TaskStateResult | None = None
|
||||
event_stream = client.send_message(message)
|
||||
chunk_index = 0
|
||||
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AStreamingStartedEvent(
|
||||
task_id=task_id,
|
||||
context_id=context_id,
|
||||
endpoint=endpoint or "",
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
turn_number=turn_number,
|
||||
is_multiturn=is_multiturn,
|
||||
agent_role=agent_role,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
try:
|
||||
async for event in event_stream:
|
||||
if isinstance(event, Message):
|
||||
new_messages.append(event)
|
||||
message_context_id = event.context_id or context_id
|
||||
for part in event.parts:
|
||||
if part.root.kind == "text":
|
||||
text = part.root.text
|
||||
result_parts.append(text)
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AStreamingChunkEvent(
|
||||
task_id=event.task_id or task_id,
|
||||
context_id=message_context_id,
|
||||
chunk=text,
|
||||
chunk_index=chunk_index,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
turn_number=turn_number,
|
||||
is_multiturn=is_multiturn,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
chunk_index += 1
|
||||
|
||||
elif isinstance(event, tuple):
|
||||
a2a_task, update = event
|
||||
@@ -81,10 +125,51 @@ class StreamingHandler:
|
||||
for part in artifact.parts
|
||||
if part.root.kind == "text"
|
||||
)
|
||||
artifact_size = None
|
||||
if artifact.parts:
|
||||
artifact_size = sum(
|
||||
len(p.root.text.encode("utf-8"))
|
||||
if p.root.kind == "text"
|
||||
else len(getattr(p.root, "data", b""))
|
||||
for p in artifact.parts
|
||||
)
|
||||
effective_context_id = a2a_task.context_id or context_id
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AArtifactReceivedEvent(
|
||||
task_id=a2a_task.id,
|
||||
artifact_id=artifact.artifact_id,
|
||||
artifact_name=artifact.name,
|
||||
artifact_description=artifact.description,
|
||||
mime_type=artifact.parts[0].root.kind
|
||||
if artifact.parts
|
||||
else None,
|
||||
size_bytes=artifact_size,
|
||||
append=update.append or False,
|
||||
last_chunk=update.last_chunk or False,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
context_id=effective_context_id,
|
||||
turn_number=turn_number,
|
||||
is_multiturn=is_multiturn,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
is_final_update = False
|
||||
if isinstance(update, TaskStatusUpdateEvent):
|
||||
is_final_update = update.final
|
||||
if (
|
||||
update.status
|
||||
and update.status.message
|
||||
and update.status.message.parts
|
||||
):
|
||||
result_parts.extend(
|
||||
part.root.text
|
||||
for part in update.status.message.parts
|
||||
if part.root.kind == "text" and part.root.text
|
||||
)
|
||||
|
||||
if (
|
||||
not is_final_update
|
||||
@@ -101,6 +186,11 @@ class StreamingHandler:
|
||||
is_multiturn=is_multiturn,
|
||||
agent_role=agent_role,
|
||||
result_parts=result_parts,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
is_final=is_final_update,
|
||||
)
|
||||
if final_result:
|
||||
break
|
||||
@@ -118,13 +208,82 @@ class StreamingHandler:
|
||||
new_messages.append(error_message)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
agent_branch,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint or "",
|
||||
error=str(e),
|
||||
error_type="http_error",
|
||||
status_code=e.status_code,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
operation="streaming",
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AResponseReceivedEvent(
|
||||
response=error_msg,
|
||||
turn_number=turn_number,
|
||||
context_id=context_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="failed",
|
||||
final=True,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
return TaskStateResult(
|
||||
status=TaskState.failed,
|
||||
error=error_msg,
|
||||
history=new_messages,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error during streaming: {e!s}"
|
||||
|
||||
error_message = Message(
|
||||
role=Role.agent,
|
||||
message_id=str(uuid.uuid4()),
|
||||
parts=[Part(root=TextPart(text=error_msg))],
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
)
|
||||
new_messages.append(error_message)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint or "",
|
||||
error=str(e),
|
||||
error_type="unexpected_error",
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
operation="streaming",
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AResponseReceivedEvent(
|
||||
response=error_msg,
|
||||
turn_number=turn_number,
|
||||
context_id=context_id,
|
||||
is_multiturn=is_multiturn,
|
||||
status="failed",
|
||||
final=True,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
return TaskStateResult(
|
||||
@@ -136,7 +295,23 @@ class StreamingHandler:
|
||||
finally:
|
||||
aclose = getattr(event_stream, "aclose", None)
|
||||
if aclose:
|
||||
await aclose()
|
||||
try:
|
||||
await aclose()
|
||||
except Exception as close_error:
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint or "",
|
||||
error=str(close_error),
|
||||
error_type="stream_close_error",
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
operation="stream_close",
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
if final_result:
|
||||
return final_result
|
||||
@@ -145,5 +320,5 @@ class StreamingHandler:
|
||||
status=TaskState.completed,
|
||||
result=" ".join(result_parts) if result_parts else "",
|
||||
history=new_messages,
|
||||
agent_card=agent_card,
|
||||
agent_card=agent_card.model_dump(exclude_none=True),
|
||||
)
|
||||
|
||||
@@ -23,6 +23,12 @@ from crewai.a2a.auth.utils import (
|
||||
)
|
||||
from crewai.a2a.config import A2AServerConfig
|
||||
from crewai.crew import Crew
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.a2a_events import (
|
||||
A2AAgentCardFetchedEvent,
|
||||
A2AAuthenticationFailedEvent,
|
||||
A2AConnectionErrorEvent,
|
||||
)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -183,6 +189,8 @@ async def _afetch_agent_card_impl(
|
||||
timeout: int,
|
||||
) -> AgentCard:
|
||||
"""Internal async implementation of AgentCard fetching."""
|
||||
start_time = time.perf_counter()
|
||||
|
||||
if "/.well-known/agent-card.json" in endpoint:
|
||||
base_url = endpoint.replace("/.well-known/agent-card.json", "")
|
||||
agent_card_path = "/.well-known/agent-card.json"
|
||||
@@ -217,9 +225,29 @@ async def _afetch_agent_card_impl(
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
return AgentCard.model_validate(response.json())
|
||||
agent_card = AgentCard.model_validate(response.json())
|
||||
fetch_time_ms = (time.perf_counter() - start_time) * 1000
|
||||
agent_card_dict = agent_card.model_dump(exclude_none=True)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AAgentCardFetchedEvent(
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=agent_card.name,
|
||||
agent_card=agent_card_dict,
|
||||
protocol_version=agent_card.protocol_version,
|
||||
provider=agent_card_dict.get("provider"),
|
||||
cached=False,
|
||||
fetch_time_ms=fetch_time_ms,
|
||||
),
|
||||
)
|
||||
|
||||
return agent_card
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
||||
response_body = e.response.text[:1000] if e.response.text else None
|
||||
|
||||
if e.response.status_code == 401:
|
||||
error_details = ["Authentication failed"]
|
||||
www_auth = e.response.headers.get("WWW-Authenticate")
|
||||
@@ -228,7 +256,93 @@ async def _afetch_agent_card_impl(
|
||||
if not auth:
|
||||
error_details.append("No auth scheme provided")
|
||||
msg = " | ".join(error_details)
|
||||
|
||||
auth_type = type(auth).__name__ if auth else None
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AAuthenticationFailedEvent(
|
||||
endpoint=endpoint,
|
||||
auth_type=auth_type,
|
||||
error=msg,
|
||||
status_code=401,
|
||||
metadata={
|
||||
"elapsed_ms": elapsed_ms,
|
||||
"response_body": response_body,
|
||||
"www_authenticate": www_auth,
|
||||
"request_url": str(e.request.url),
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
raise A2AClientHTTPError(401, msg) from e
|
||||
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint,
|
||||
error=str(e),
|
||||
error_type="http_error",
|
||||
status_code=e.response.status_code,
|
||||
operation="fetch_agent_card",
|
||||
metadata={
|
||||
"elapsed_ms": elapsed_ms,
|
||||
"response_body": response_body,
|
||||
"request_url": str(e.request.url),
|
||||
},
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
except httpx.TimeoutException as e:
|
||||
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint,
|
||||
error=str(e),
|
||||
error_type="timeout",
|
||||
operation="fetch_agent_card",
|
||||
metadata={
|
||||
"elapsed_ms": elapsed_ms,
|
||||
"timeout_config": timeout,
|
||||
"request_url": str(e.request.url) if e.request else None,
|
||||
},
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
except httpx.ConnectError as e:
|
||||
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint,
|
||||
error=str(e),
|
||||
error_type="connection_error",
|
||||
operation="fetch_agent_card",
|
||||
metadata={
|
||||
"elapsed_ms": elapsed_ms,
|
||||
"request_url": str(e.request.url) if e.request else None,
|
||||
},
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
except httpx.RequestError as e:
|
||||
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AConnectionErrorEvent(
|
||||
endpoint=endpoint,
|
||||
error=str(e),
|
||||
error_type="request_error",
|
||||
operation="fetch_agent_card",
|
||||
metadata={
|
||||
"elapsed_ms": elapsed_ms,
|
||||
"request_url": str(e.request.url) if e.request else None,
|
||||
},
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
|
||||
@@ -88,6 +88,9 @@ def execute_a2a_delegation(
|
||||
response_model: type[BaseModel] | None = None,
|
||||
turn_number: int | None = None,
|
||||
updates: UpdateConfig | None = None,
|
||||
from_task: Any | None = None,
|
||||
from_agent: Any | None = None,
|
||||
skill_id: str | None = None,
|
||||
) -> TaskStateResult:
|
||||
"""Execute a task delegation to a remote A2A agent synchronously.
|
||||
|
||||
@@ -129,6 +132,9 @@ def execute_a2a_delegation(
|
||||
response_model: Optional Pydantic model for structured outputs.
|
||||
turn_number: Optional turn number for multi-turn conversations.
|
||||
updates: Update mechanism config from A2AConfig.updates.
|
||||
from_task: Optional CrewAI Task object for event metadata.
|
||||
from_agent: Optional CrewAI Agent object for event metadata.
|
||||
skill_id: Optional skill ID to target a specific agent capability.
|
||||
|
||||
Returns:
|
||||
TaskStateResult with status, result/error, history, and agent_card.
|
||||
@@ -156,10 +162,16 @@ def execute_a2a_delegation(
|
||||
transport_protocol=transport_protocol,
|
||||
turn_number=turn_number,
|
||||
updates=updates,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
skill_id=skill_id,
|
||||
)
|
||||
)
|
||||
finally:
|
||||
loop.close()
|
||||
try:
|
||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
async def aexecute_a2a_delegation(
|
||||
@@ -181,6 +193,9 @@ async def aexecute_a2a_delegation(
|
||||
response_model: type[BaseModel] | None = None,
|
||||
turn_number: int | None = None,
|
||||
updates: UpdateConfig | None = None,
|
||||
from_task: Any | None = None,
|
||||
from_agent: Any | None = None,
|
||||
skill_id: str | None = None,
|
||||
) -> TaskStateResult:
|
||||
"""Execute a task delegation to a remote A2A agent asynchronously.
|
||||
|
||||
@@ -222,6 +237,9 @@ async def aexecute_a2a_delegation(
|
||||
response_model: Optional Pydantic model for structured outputs.
|
||||
turn_number: Optional turn number for multi-turn conversations.
|
||||
updates: Update mechanism config from A2AConfig.updates.
|
||||
from_task: Optional CrewAI Task object for event metadata.
|
||||
from_agent: Optional CrewAI Agent object for event metadata.
|
||||
skill_id: Optional skill ID to target a specific agent capability.
|
||||
|
||||
Returns:
|
||||
TaskStateResult with status, result/error, history, and agent_card.
|
||||
@@ -233,17 +251,6 @@ async def aexecute_a2a_delegation(
|
||||
if turn_number is None:
|
||||
turn_number = len([m for m in conversation_history if m.role == Role.user]) + 1
|
||||
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2ADelegationStartedEvent(
|
||||
endpoint=endpoint,
|
||||
task_description=task_description,
|
||||
agent_id=agent_id,
|
||||
is_multiturn=is_multiturn,
|
||||
turn_number=turn_number,
|
||||
),
|
||||
)
|
||||
|
||||
result = await _aexecute_a2a_delegation_impl(
|
||||
endpoint=endpoint,
|
||||
auth=auth,
|
||||
@@ -264,15 +271,28 @@ async def aexecute_a2a_delegation(
|
||||
response_model=response_model,
|
||||
updates=updates,
|
||||
transport_protocol=transport_protocol,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
skill_id=skill_id,
|
||||
)
|
||||
|
||||
agent_card_data: dict[str, Any] = result.get("agent_card") or {}
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2ADelegationCompletedEvent(
|
||||
status=result["status"],
|
||||
result=result.get("result"),
|
||||
error=result.get("error"),
|
||||
context_id=context_id,
|
||||
is_multiturn=is_multiturn,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=result.get("a2a_agent_name"),
|
||||
agent_card=agent_card_data,
|
||||
provider=agent_card_data.get("provider"),
|
||||
metadata=metadata,
|
||||
extensions=list(extensions.keys()) if extensions else None,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -299,6 +319,9 @@ async def _aexecute_a2a_delegation_impl(
|
||||
agent_role: str | None,
|
||||
response_model: type[BaseModel] | None,
|
||||
updates: UpdateConfig | None,
|
||||
from_task: Any | None = None,
|
||||
from_agent: Any | None = None,
|
||||
skill_id: str | None = None,
|
||||
) -> TaskStateResult:
|
||||
"""Internal async implementation of A2A delegation."""
|
||||
if auth:
|
||||
@@ -331,6 +354,28 @@ async def _aexecute_a2a_delegation_impl(
|
||||
if agent_card.name:
|
||||
a2a_agent_name = agent_card.name
|
||||
|
||||
agent_card_dict = agent_card.model_dump(exclude_none=True)
|
||||
crewai_event_bus.emit(
|
||||
agent_branch,
|
||||
A2ADelegationStartedEvent(
|
||||
endpoint=endpoint,
|
||||
task_description=task_description,
|
||||
agent_id=agent_id or endpoint,
|
||||
context_id=context_id,
|
||||
is_multiturn=is_multiturn,
|
||||
turn_number=turn_number,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
agent_card=agent_card_dict,
|
||||
protocol_version=agent_card.protocol_version,
|
||||
provider=agent_card_dict.get("provider"),
|
||||
skill_id=skill_id,
|
||||
metadata=metadata,
|
||||
extensions=list(extensions.keys()) if extensions else None,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
if turn_number == 1:
|
||||
agent_id_for_event = agent_id or endpoint
|
||||
crewai_event_bus.emit(
|
||||
@@ -338,7 +383,17 @@ async def _aexecute_a2a_delegation_impl(
|
||||
A2AConversationStartedEvent(
|
||||
agent_id=agent_id_for_event,
|
||||
endpoint=endpoint,
|
||||
context_id=context_id,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
agent_card=agent_card_dict,
|
||||
protocol_version=agent_card.protocol_version,
|
||||
provider=agent_card_dict.get("provider"),
|
||||
skill_id=skill_id,
|
||||
reference_task_ids=reference_task_ids,
|
||||
metadata=metadata,
|
||||
extensions=list(extensions.keys()) if extensions else None,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -364,6 +419,10 @@ async def _aexecute_a2a_delegation_impl(
|
||||
}
|
||||
)
|
||||
|
||||
message_metadata = metadata.copy() if metadata else {}
|
||||
if skill_id:
|
||||
message_metadata["skill_id"] = skill_id
|
||||
|
||||
message = Message(
|
||||
role=Role.user,
|
||||
message_id=str(uuid.uuid4()),
|
||||
@@ -371,7 +430,7 @@ async def _aexecute_a2a_delegation_impl(
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
reference_task_ids=reference_task_ids,
|
||||
metadata=metadata,
|
||||
metadata=message_metadata if message_metadata else None,
|
||||
extensions=extensions,
|
||||
)
|
||||
|
||||
@@ -381,8 +440,17 @@ async def _aexecute_a2a_delegation_impl(
|
||||
A2AMessageSentEvent(
|
||||
message=message_text,
|
||||
turn_number=turn_number,
|
||||
context_id=context_id,
|
||||
message_id=message.message_id,
|
||||
is_multiturn=is_multiturn,
|
||||
agent_role=agent_role,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
skill_id=skill_id,
|
||||
metadata=message_metadata if message_metadata else None,
|
||||
extensions=list(extensions.keys()) if extensions else None,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -397,6 +465,9 @@ async def _aexecute_a2a_delegation_impl(
|
||||
"task_id": task_id,
|
||||
"endpoint": endpoint,
|
||||
"agent_branch": agent_branch,
|
||||
"a2a_agent_name": a2a_agent_name,
|
||||
"from_task": from_task,
|
||||
"from_agent": from_agent,
|
||||
}
|
||||
|
||||
if isinstance(updates, PollingConfig):
|
||||
@@ -434,13 +505,16 @@ async def _aexecute_a2a_delegation_impl(
|
||||
use_polling=use_polling,
|
||||
push_notification_config=push_config_for_client,
|
||||
) as client:
|
||||
return await handler.execute(
|
||||
result = await handler.execute(
|
||||
client=client,
|
||||
message=message,
|
||||
new_messages=new_messages,
|
||||
agent_card=agent_card,
|
||||
**handler_kwargs,
|
||||
)
|
||||
result["a2a_agent_name"] = a2a_agent_name
|
||||
result["agent_card"] = agent_card.model_dump(exclude_none=True)
|
||||
return result
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
||||
@@ -3,11 +3,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
from collections.abc import Callable, Coroutine
|
||||
from datetime import datetime
|
||||
from functools import wraps
|
||||
import logging
|
||||
import os
|
||||
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from a2a.server.agent_execution import RequestContext
|
||||
from a2a.server.events import EventQueue
|
||||
@@ -45,7 +48,14 @@ T = TypeVar("T")
|
||||
|
||||
|
||||
def _parse_redis_url(url: str) -> dict[str, Any]:
|
||||
from urllib.parse import urlparse
|
||||
"""Parse a Redis URL into aiocache configuration.
|
||||
|
||||
Args:
|
||||
url: Redis connection URL (e.g., redis://localhost:6379/0).
|
||||
|
||||
Returns:
|
||||
Configuration dict for aiocache.RedisCache.
|
||||
"""
|
||||
|
||||
parsed = urlparse(url)
|
||||
config: dict[str, Any] = {
|
||||
@@ -127,7 +137,7 @@ def cancellable(
|
||||
async for message in pubsub.listen():
|
||||
if message["type"] == "message":
|
||||
return True
|
||||
except Exception as e:
|
||||
except (OSError, ConnectionError) as e:
|
||||
logger.warning("Cancel watcher error for task_id=%s: %s", task_id, e)
|
||||
return await poll_for_cancel()
|
||||
return False
|
||||
@@ -183,7 +193,12 @@ async def execute(
|
||||
msg = "task_id and context_id are required"
|
||||
crewai_event_bus.emit(
|
||||
agent,
|
||||
A2AServerTaskFailedEvent(a2a_task_id="", a2a_context_id="", error=msg),
|
||||
A2AServerTaskFailedEvent(
|
||||
task_id="",
|
||||
context_id="",
|
||||
error=msg,
|
||||
from_agent=agent,
|
||||
),
|
||||
)
|
||||
raise ServerError(InvalidParamsError(message=msg)) from None
|
||||
|
||||
@@ -195,7 +210,12 @@ async def execute(
|
||||
|
||||
crewai_event_bus.emit(
|
||||
agent,
|
||||
A2AServerTaskStartedEvent(a2a_task_id=task_id, a2a_context_id=context_id),
|
||||
A2AServerTaskStartedEvent(
|
||||
task_id=task_id,
|
||||
context_id=context_id,
|
||||
from_task=task,
|
||||
from_agent=agent,
|
||||
),
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -215,20 +235,33 @@ async def execute(
|
||||
crewai_event_bus.emit(
|
||||
agent,
|
||||
A2AServerTaskCompletedEvent(
|
||||
a2a_task_id=task_id, a2a_context_id=context_id, result=str(result)
|
||||
task_id=task_id,
|
||||
context_id=context_id,
|
||||
result=str(result),
|
||||
from_task=task,
|
||||
from_agent=agent,
|
||||
),
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
crewai_event_bus.emit(
|
||||
agent,
|
||||
A2AServerTaskCanceledEvent(a2a_task_id=task_id, a2a_context_id=context_id),
|
||||
A2AServerTaskCanceledEvent(
|
||||
task_id=task_id,
|
||||
context_id=context_id,
|
||||
from_task=task,
|
||||
from_agent=agent,
|
||||
),
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
agent,
|
||||
A2AServerTaskFailedEvent(
|
||||
a2a_task_id=task_id, a2a_context_id=context_id, error=str(e)
|
||||
task_id=task_id,
|
||||
context_id=context_id,
|
||||
error=str(e),
|
||||
from_task=task,
|
||||
from_agent=agent,
|
||||
),
|
||||
)
|
||||
raise ServerError(
|
||||
@@ -282,3 +315,85 @@ async def cancel(
|
||||
context.current_task.status = TaskStatus(state=TaskState.canceled)
|
||||
return context.current_task
|
||||
return None
|
||||
|
||||
|
||||
def list_tasks(
|
||||
tasks: list[A2ATask],
|
||||
context_id: str | None = None,
|
||||
status: TaskState | None = None,
|
||||
status_timestamp_after: datetime | None = None,
|
||||
page_size: int = 50,
|
||||
page_token: str | None = None,
|
||||
history_length: int | None = None,
|
||||
include_artifacts: bool = False,
|
||||
) -> tuple[list[A2ATask], str | None, int]:
|
||||
"""Filter and paginate A2A tasks.
|
||||
|
||||
Provides filtering by context, status, and timestamp, along with
|
||||
cursor-based pagination. This is a pure utility function that operates
|
||||
on an in-memory list of tasks - storage retrieval is handled separately.
|
||||
|
||||
Args:
|
||||
tasks: All tasks to filter.
|
||||
context_id: Filter by context ID to get tasks in a conversation.
|
||||
status: Filter by task state (e.g., completed, working).
|
||||
status_timestamp_after: Filter to tasks updated after this time.
|
||||
page_size: Maximum tasks per page (default 50).
|
||||
page_token: Base64-encoded cursor from previous response.
|
||||
history_length: Limit history messages per task (None = full history).
|
||||
include_artifacts: Whether to include task artifacts (default False).
|
||||
|
||||
Returns:
|
||||
Tuple of (filtered_tasks, next_page_token, total_count).
|
||||
- filtered_tasks: Tasks matching filters, paginated and trimmed.
|
||||
- next_page_token: Token for next page, or None if no more pages.
|
||||
- total_count: Total number of tasks matching filters (before pagination).
|
||||
"""
|
||||
filtered: list[A2ATask] = []
|
||||
for task in tasks:
|
||||
if context_id and task.context_id != context_id:
|
||||
continue
|
||||
if status and task.status.state != status:
|
||||
continue
|
||||
if status_timestamp_after and task.status.timestamp:
|
||||
ts = datetime.fromisoformat(task.status.timestamp.replace("Z", "+00:00"))
|
||||
if ts <= status_timestamp_after:
|
||||
continue
|
||||
filtered.append(task)
|
||||
|
||||
def get_timestamp(t: A2ATask) -> datetime:
|
||||
"""Extract timestamp from task status for sorting."""
|
||||
if t.status.timestamp is None:
|
||||
return datetime.min
|
||||
return datetime.fromisoformat(t.status.timestamp.replace("Z", "+00:00"))
|
||||
|
||||
filtered.sort(key=get_timestamp, reverse=True)
|
||||
total = len(filtered)
|
||||
|
||||
start = 0
|
||||
if page_token:
|
||||
try:
|
||||
cursor_id = base64.b64decode(page_token).decode()
|
||||
for idx, task in enumerate(filtered):
|
||||
if task.id == cursor_id:
|
||||
start = idx + 1
|
||||
break
|
||||
except (ValueError, UnicodeDecodeError):
|
||||
pass
|
||||
|
||||
page = filtered[start : start + page_size]
|
||||
|
||||
result: list[A2ATask] = []
|
||||
for task in page:
|
||||
task = task.model_copy(deep=True)
|
||||
if history_length is not None and task.history:
|
||||
task.history = task.history[-history_length:]
|
||||
if not include_artifacts:
|
||||
task.artifacts = None
|
||||
result.append(task)
|
||||
|
||||
next_token: str | None = None
|
||||
if result and len(result) == page_size:
|
||||
next_token = base64.b64encode(result[-1].id.encode()).decode()
|
||||
|
||||
return result, next_token, total
|
||||
|
||||
@@ -6,9 +6,10 @@ Wraps agent classes with A2A delegation capabilities.
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable, Coroutine
|
||||
from collections.abc import Callable, Coroutine, Mapping
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from functools import wraps
|
||||
import json
|
||||
from types import MethodType
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
@@ -189,7 +190,7 @@ def _execute_task_with_a2a(
|
||||
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||
original_fn: Callable[..., str],
|
||||
task: Task,
|
||||
agent_response_model: type[BaseModel],
|
||||
agent_response_model: type[BaseModel] | None,
|
||||
context: str | None,
|
||||
tools: list[BaseTool] | None,
|
||||
extension_registry: ExtensionRegistry,
|
||||
@@ -277,7 +278,7 @@ def _execute_task_with_a2a(
|
||||
def _augment_prompt_with_a2a(
|
||||
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||
task_description: str,
|
||||
agent_cards: dict[str, AgentCard],
|
||||
agent_cards: Mapping[str, AgentCard | dict[str, Any]],
|
||||
conversation_history: list[Message] | None = None,
|
||||
turn_num: int = 0,
|
||||
max_turns: int | None = None,
|
||||
@@ -309,7 +310,15 @@ def _augment_prompt_with_a2a(
|
||||
for config in a2a_agents:
|
||||
if config.endpoint in agent_cards:
|
||||
card = agent_cards[config.endpoint]
|
||||
agents_text += f"\n{card.model_dump_json(indent=2, exclude_none=True, include={'description', 'url', 'skills'})}\n"
|
||||
if isinstance(card, dict):
|
||||
filtered = {
|
||||
k: v
|
||||
for k, v in card.items()
|
||||
if k in {"description", "url", "skills"} and v is not None
|
||||
}
|
||||
agents_text += f"\n{json.dumps(filtered, indent=2)}\n"
|
||||
else:
|
||||
agents_text += f"\n{card.model_dump_json(indent=2, exclude_none=True, include={'description', 'url', 'skills'})}\n"
|
||||
|
||||
failed_agents = failed_agents or {}
|
||||
if failed_agents:
|
||||
@@ -377,7 +386,7 @@ IMPORTANT: You have the ability to delegate this task to remote A2A agents.
|
||||
|
||||
|
||||
def _parse_agent_response(
|
||||
raw_result: str | dict[str, Any], agent_response_model: type[BaseModel]
|
||||
raw_result: str | dict[str, Any], agent_response_model: type[BaseModel] | None
|
||||
) -> BaseModel | str | dict[str, Any]:
|
||||
"""Parse LLM output as AgentResponse or return raw agent response."""
|
||||
if agent_response_model:
|
||||
@@ -394,6 +403,11 @@ def _parse_agent_response(
|
||||
def _handle_max_turns_exceeded(
|
||||
conversation_history: list[Message],
|
||||
max_turns: int,
|
||||
from_task: Any | None = None,
|
||||
from_agent: Any | None = None,
|
||||
endpoint: str | None = None,
|
||||
a2a_agent_name: str | None = None,
|
||||
agent_card: dict[str, Any] | None = None,
|
||||
) -> str:
|
||||
"""Handle the case when max turns is exceeded.
|
||||
|
||||
@@ -421,6 +435,11 @@ def _handle_max_turns_exceeded(
|
||||
final_result=final_message,
|
||||
error=None,
|
||||
total_turns=max_turns,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
agent_card=agent_card,
|
||||
),
|
||||
)
|
||||
return final_message
|
||||
@@ -432,6 +451,11 @@ def _handle_max_turns_exceeded(
|
||||
final_result=None,
|
||||
error=f"Conversation exceeded maximum turns ({max_turns})",
|
||||
total_turns=max_turns,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
agent_card=agent_card,
|
||||
),
|
||||
)
|
||||
raise Exception(f"A2A conversation exceeded maximum turns ({max_turns})")
|
||||
@@ -442,7 +466,12 @@ def _process_response_result(
|
||||
disable_structured_output: bool,
|
||||
turn_num: int,
|
||||
agent_role: str,
|
||||
agent_response_model: type[BaseModel],
|
||||
agent_response_model: type[BaseModel] | None,
|
||||
from_task: Any | None = None,
|
||||
from_agent: Any | None = None,
|
||||
endpoint: str | None = None,
|
||||
a2a_agent_name: str | None = None,
|
||||
agent_card: dict[str, Any] | None = None,
|
||||
) -> tuple[str | None, str | None]:
|
||||
"""Process LLM response and determine next action.
|
||||
|
||||
@@ -461,6 +490,10 @@ def _process_response_result(
|
||||
turn_number=final_turn_number,
|
||||
is_multiturn=True,
|
||||
agent_role=agent_role,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
@@ -470,6 +503,11 @@ def _process_response_result(
|
||||
final_result=result_text,
|
||||
error=None,
|
||||
total_turns=final_turn_number,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
agent_card=agent_card,
|
||||
),
|
||||
)
|
||||
return result_text, None
|
||||
@@ -490,6 +528,10 @@ def _process_response_result(
|
||||
turn_number=final_turn_number,
|
||||
is_multiturn=True,
|
||||
agent_role=agent_role,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
@@ -499,6 +541,11 @@ def _process_response_result(
|
||||
final_result=str(llm_response.message),
|
||||
error=None,
|
||||
total_turns=final_turn_number,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
agent_card=agent_card,
|
||||
),
|
||||
)
|
||||
return str(llm_response.message), None
|
||||
@@ -510,13 +557,15 @@ def _process_response_result(
|
||||
def _prepare_agent_cards_dict(
|
||||
a2a_result: TaskStateResult,
|
||||
agent_id: str,
|
||||
agent_cards: dict[str, AgentCard] | None,
|
||||
) -> dict[str, AgentCard]:
|
||||
agent_cards: Mapping[str, AgentCard | dict[str, Any]] | None,
|
||||
) -> dict[str, AgentCard | dict[str, Any]]:
|
||||
"""Prepare agent cards dictionary from result and existing cards.
|
||||
|
||||
Shared logic for both sync and async response handlers.
|
||||
"""
|
||||
agent_cards_dict = agent_cards or {}
|
||||
agent_cards_dict: dict[str, AgentCard | dict[str, Any]] = (
|
||||
dict(agent_cards) if agent_cards else {}
|
||||
)
|
||||
if "agent_card" in a2a_result and agent_id not in agent_cards_dict:
|
||||
agent_cards_dict[agent_id] = a2a_result["agent_card"]
|
||||
return agent_cards_dict
|
||||
@@ -529,7 +578,7 @@ def _prepare_delegation_context(
|
||||
original_task_description: str | None,
|
||||
) -> tuple[
|
||||
list[A2AConfig | A2AClientConfig],
|
||||
type[BaseModel],
|
||||
type[BaseModel] | None,
|
||||
str,
|
||||
str,
|
||||
A2AConfig | A2AClientConfig,
|
||||
@@ -598,6 +647,11 @@ def _handle_task_completion(
|
||||
reference_task_ids: list[str],
|
||||
agent_config: A2AConfig | A2AClientConfig,
|
||||
turn_num: int,
|
||||
from_task: Any | None = None,
|
||||
from_agent: Any | None = None,
|
||||
endpoint: str | None = None,
|
||||
a2a_agent_name: str | None = None,
|
||||
agent_card: dict[str, Any] | None = None,
|
||||
) -> tuple[str | None, str | None, list[str]]:
|
||||
"""Handle task completion state including reference task updates.
|
||||
|
||||
@@ -624,6 +678,11 @@ def _handle_task_completion(
|
||||
final_result=result_text,
|
||||
error=None,
|
||||
total_turns=final_turn_number,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
agent_card=agent_card,
|
||||
),
|
||||
)
|
||||
return str(result_text), task_id_config, reference_task_ids
|
||||
@@ -645,8 +704,11 @@ def _handle_agent_response_and_continue(
|
||||
original_fn: Callable[..., str],
|
||||
context: str | None,
|
||||
tools: list[BaseTool] | None,
|
||||
agent_response_model: type[BaseModel],
|
||||
agent_response_model: type[BaseModel] | None,
|
||||
remote_task_completed: bool = False,
|
||||
endpoint: str | None = None,
|
||||
a2a_agent_name: str | None = None,
|
||||
agent_card: dict[str, Any] | None = None,
|
||||
) -> tuple[str | None, str | None]:
|
||||
"""Handle A2A result and get CrewAI agent's response.
|
||||
|
||||
@@ -698,6 +760,11 @@ def _handle_agent_response_and_continue(
|
||||
turn_num=turn_num,
|
||||
agent_role=self.role,
|
||||
agent_response_model=agent_response_model,
|
||||
from_task=task,
|
||||
from_agent=self,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
agent_card=agent_card,
|
||||
)
|
||||
|
||||
|
||||
@@ -750,6 +817,12 @@ def _delegate_to_a2a(
|
||||
|
||||
conversation_history: list[Message] = []
|
||||
|
||||
current_agent_card = agent_cards.get(agent_id) if agent_cards else None
|
||||
current_agent_card_dict = (
|
||||
current_agent_card.model_dump() if current_agent_card else None
|
||||
)
|
||||
current_a2a_agent_name = current_agent_card.name if current_agent_card else None
|
||||
|
||||
try:
|
||||
for turn_num in range(max_turns):
|
||||
console_formatter = getattr(crewai_event_bus, "_console", None)
|
||||
@@ -777,6 +850,8 @@ def _delegate_to_a2a(
|
||||
turn_number=turn_num + 1,
|
||||
updates=agent_config.updates,
|
||||
transport_protocol=agent_config.transport_protocol,
|
||||
from_task=task,
|
||||
from_agent=self,
|
||||
)
|
||||
|
||||
conversation_history = a2a_result.get("history", [])
|
||||
@@ -797,6 +872,11 @@ def _delegate_to_a2a(
|
||||
reference_task_ids,
|
||||
agent_config,
|
||||
turn_num,
|
||||
from_task=task,
|
||||
from_agent=self,
|
||||
endpoint=agent_config.endpoint,
|
||||
a2a_agent_name=current_a2a_agent_name,
|
||||
agent_card=current_agent_card_dict,
|
||||
)
|
||||
)
|
||||
if trusted_result is not None:
|
||||
@@ -818,6 +898,9 @@ def _delegate_to_a2a(
|
||||
tools=tools,
|
||||
agent_response_model=agent_response_model,
|
||||
remote_task_completed=(a2a_result["status"] == TaskState.completed),
|
||||
endpoint=agent_config.endpoint,
|
||||
a2a_agent_name=current_a2a_agent_name,
|
||||
agent_card=current_agent_card_dict,
|
||||
)
|
||||
|
||||
if final_result is not None:
|
||||
@@ -846,6 +929,9 @@ def _delegate_to_a2a(
|
||||
tools=tools,
|
||||
agent_response_model=agent_response_model,
|
||||
remote_task_completed=False,
|
||||
endpoint=agent_config.endpoint,
|
||||
a2a_agent_name=current_a2a_agent_name,
|
||||
agent_card=current_agent_card_dict,
|
||||
)
|
||||
|
||||
if final_result is not None:
|
||||
@@ -862,11 +948,24 @@ def _delegate_to_a2a(
|
||||
final_result=None,
|
||||
error=error_msg,
|
||||
total_turns=turn_num + 1,
|
||||
from_task=task,
|
||||
from_agent=self,
|
||||
endpoint=agent_config.endpoint,
|
||||
a2a_agent_name=current_a2a_agent_name,
|
||||
agent_card=current_agent_card_dict,
|
||||
),
|
||||
)
|
||||
return f"A2A delegation failed: {error_msg}"
|
||||
|
||||
return _handle_max_turns_exceeded(conversation_history, max_turns)
|
||||
return _handle_max_turns_exceeded(
|
||||
conversation_history,
|
||||
max_turns,
|
||||
from_task=task,
|
||||
from_agent=self,
|
||||
endpoint=agent_config.endpoint,
|
||||
a2a_agent_name=current_a2a_agent_name,
|
||||
agent_card=current_agent_card_dict,
|
||||
)
|
||||
|
||||
finally:
|
||||
task.description = original_task_description
|
||||
@@ -916,7 +1015,7 @@ async def _aexecute_task_with_a2a(
|
||||
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||
original_fn: Callable[..., Coroutine[Any, Any, str]],
|
||||
task: Task,
|
||||
agent_response_model: type[BaseModel],
|
||||
agent_response_model: type[BaseModel] | None,
|
||||
context: str | None,
|
||||
tools: list[BaseTool] | None,
|
||||
extension_registry: ExtensionRegistry,
|
||||
@@ -1001,8 +1100,11 @@ async def _ahandle_agent_response_and_continue(
|
||||
original_fn: Callable[..., Coroutine[Any, Any, str]],
|
||||
context: str | None,
|
||||
tools: list[BaseTool] | None,
|
||||
agent_response_model: type[BaseModel],
|
||||
agent_response_model: type[BaseModel] | None,
|
||||
remote_task_completed: bool = False,
|
||||
endpoint: str | None = None,
|
||||
a2a_agent_name: str | None = None,
|
||||
agent_card: dict[str, Any] | None = None,
|
||||
) -> tuple[str | None, str | None]:
|
||||
"""Async version of _handle_agent_response_and_continue."""
|
||||
agent_cards_dict = _prepare_agent_cards_dict(a2a_result, agent_id, agent_cards)
|
||||
@@ -1032,6 +1134,11 @@ async def _ahandle_agent_response_and_continue(
|
||||
turn_num=turn_num,
|
||||
agent_role=self.role,
|
||||
agent_response_model=agent_response_model,
|
||||
from_task=task,
|
||||
from_agent=self,
|
||||
endpoint=endpoint,
|
||||
a2a_agent_name=a2a_agent_name,
|
||||
agent_card=agent_card,
|
||||
)
|
||||
|
||||
|
||||
@@ -1066,6 +1173,12 @@ async def _adelegate_to_a2a(
|
||||
|
||||
conversation_history: list[Message] = []
|
||||
|
||||
current_agent_card = agent_cards.get(agent_id) if agent_cards else None
|
||||
current_agent_card_dict = (
|
||||
current_agent_card.model_dump() if current_agent_card else None
|
||||
)
|
||||
current_a2a_agent_name = current_agent_card.name if current_agent_card else None
|
||||
|
||||
try:
|
||||
for turn_num in range(max_turns):
|
||||
console_formatter = getattr(crewai_event_bus, "_console", None)
|
||||
@@ -1093,6 +1206,8 @@ async def _adelegate_to_a2a(
|
||||
turn_number=turn_num + 1,
|
||||
transport_protocol=agent_config.transport_protocol,
|
||||
updates=agent_config.updates,
|
||||
from_task=task,
|
||||
from_agent=self,
|
||||
)
|
||||
|
||||
conversation_history = a2a_result.get("history", [])
|
||||
@@ -1113,6 +1228,11 @@ async def _adelegate_to_a2a(
|
||||
reference_task_ids,
|
||||
agent_config,
|
||||
turn_num,
|
||||
from_task=task,
|
||||
from_agent=self,
|
||||
endpoint=agent_config.endpoint,
|
||||
a2a_agent_name=current_a2a_agent_name,
|
||||
agent_card=current_agent_card_dict,
|
||||
)
|
||||
)
|
||||
if trusted_result is not None:
|
||||
@@ -1134,6 +1254,9 @@ async def _adelegate_to_a2a(
|
||||
tools=tools,
|
||||
agent_response_model=agent_response_model,
|
||||
remote_task_completed=(a2a_result["status"] == TaskState.completed),
|
||||
endpoint=agent_config.endpoint,
|
||||
a2a_agent_name=current_a2a_agent_name,
|
||||
agent_card=current_agent_card_dict,
|
||||
)
|
||||
|
||||
if final_result is not None:
|
||||
@@ -1161,6 +1284,9 @@ async def _adelegate_to_a2a(
|
||||
context=context,
|
||||
tools=tools,
|
||||
agent_response_model=agent_response_model,
|
||||
endpoint=agent_config.endpoint,
|
||||
a2a_agent_name=current_a2a_agent_name,
|
||||
agent_card=current_agent_card_dict,
|
||||
)
|
||||
|
||||
if final_result is not None:
|
||||
@@ -1177,11 +1303,24 @@ async def _adelegate_to_a2a(
|
||||
final_result=None,
|
||||
error=error_msg,
|
||||
total_turns=turn_num + 1,
|
||||
from_task=task,
|
||||
from_agent=self,
|
||||
endpoint=agent_config.endpoint,
|
||||
a2a_agent_name=current_a2a_agent_name,
|
||||
agent_card=current_agent_card_dict,
|
||||
),
|
||||
)
|
||||
return f"A2A delegation failed: {error_msg}"
|
||||
|
||||
return _handle_max_turns_exceeded(conversation_history, max_turns)
|
||||
return _handle_max_turns_exceeded(
|
||||
conversation_history,
|
||||
max_turns,
|
||||
from_task=task,
|
||||
from_agent=self,
|
||||
endpoint=agent_config.endpoint,
|
||||
a2a_agent_name=current_a2a_agent_name,
|
||||
agent_card=current_agent_card_dict,
|
||||
)
|
||||
|
||||
finally:
|
||||
task.description = original_task_description
|
||||
|
||||
@@ -219,7 +219,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
Final answer from the agent.
|
||||
"""
|
||||
formatted_answer = None
|
||||
last_raw_output: str | None = None
|
||||
while not isinstance(formatted_answer, AgentFinish):
|
||||
try:
|
||||
if has_reached_max_iterations(self.iterations, self.max_iter):
|
||||
@@ -245,7 +244,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
response_model=self.response_model,
|
||||
executor_context=self,
|
||||
)
|
||||
last_raw_output = answer
|
||||
if self.response_model is not None:
|
||||
try:
|
||||
self.response_model.model_validate_json(answer)
|
||||
@@ -302,8 +300,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
iterations=self.iterations,
|
||||
log_error_after=self.log_error_after,
|
||||
printer=self._printer,
|
||||
raw_output=last_raw_output,
|
||||
agent_role=self.agent.role if self.agent else None,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
@@ -390,7 +386,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
Final answer from the agent.
|
||||
"""
|
||||
formatted_answer = None
|
||||
last_raw_output: str | None = None
|
||||
while not isinstance(formatted_answer, AgentFinish):
|
||||
try:
|
||||
if has_reached_max_iterations(self.iterations, self.max_iter):
|
||||
@@ -416,7 +411,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
response_model=self.response_model,
|
||||
executor_context=self,
|
||||
)
|
||||
last_raw_output = answer
|
||||
|
||||
if self.response_model is not None:
|
||||
try:
|
||||
@@ -473,8 +467,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
iterations=self.iterations,
|
||||
log_error_after=self.log_error_after,
|
||||
printer=self._printer,
|
||||
raw_output=last_raw_output,
|
||||
agent_role=self.agent.role if self.agent else None,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -1,19 +1,28 @@
|
||||
from crewai.events.types.a2a_events import (
|
||||
A2AAgentCardFetchedEvent,
|
||||
A2AArtifactReceivedEvent,
|
||||
A2AAuthenticationFailedEvent,
|
||||
A2AConnectionErrorEvent,
|
||||
A2AConversationCompletedEvent,
|
||||
A2AConversationStartedEvent,
|
||||
A2ADelegationCompletedEvent,
|
||||
A2ADelegationStartedEvent,
|
||||
A2AMessageSentEvent,
|
||||
A2AParallelDelegationCompletedEvent,
|
||||
A2AParallelDelegationStartedEvent,
|
||||
A2APollingStartedEvent,
|
||||
A2APollingStatusEvent,
|
||||
A2APushNotificationReceivedEvent,
|
||||
A2APushNotificationRegisteredEvent,
|
||||
A2APushNotificationSentEvent,
|
||||
A2APushNotificationTimeoutEvent,
|
||||
A2AResponseReceivedEvent,
|
||||
A2AServerTaskCanceledEvent,
|
||||
A2AServerTaskCompletedEvent,
|
||||
A2AServerTaskFailedEvent,
|
||||
A2AServerTaskStartedEvent,
|
||||
A2AStreamingChunkEvent,
|
||||
A2AStreamingStartedEvent,
|
||||
)
|
||||
from crewai.events.types.agent_events import (
|
||||
AgentExecutionCompletedEvent,
|
||||
@@ -93,7 +102,11 @@ from crewai.events.types.tool_usage_events import (
|
||||
|
||||
|
||||
EventTypes = (
|
||||
A2AConversationCompletedEvent
|
||||
A2AAgentCardFetchedEvent
|
||||
| A2AArtifactReceivedEvent
|
||||
| A2AAuthenticationFailedEvent
|
||||
| A2AConnectionErrorEvent
|
||||
| A2AConversationCompletedEvent
|
||||
| A2AConversationStartedEvent
|
||||
| A2ADelegationCompletedEvent
|
||||
| A2ADelegationStartedEvent
|
||||
@@ -102,12 +115,17 @@ EventTypes = (
|
||||
| A2APollingStatusEvent
|
||||
| A2APushNotificationReceivedEvent
|
||||
| A2APushNotificationRegisteredEvent
|
||||
| A2APushNotificationSentEvent
|
||||
| A2APushNotificationTimeoutEvent
|
||||
| A2AResponseReceivedEvent
|
||||
| A2AServerTaskCanceledEvent
|
||||
| A2AServerTaskCompletedEvent
|
||||
| A2AServerTaskFailedEvent
|
||||
| A2AServerTaskStartedEvent
|
||||
| A2AStreamingChunkEvent
|
||||
| A2AStreamingStartedEvent
|
||||
| A2AParallelDelegationStartedEvent
|
||||
| A2AParallelDelegationCompletedEvent
|
||||
| CrewKickoffStartedEvent
|
||||
| CrewKickoffCompletedEvent
|
||||
| CrewKickoffFailedEvent
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"""Trace collection listener for orchestrating trace collection."""
|
||||
|
||||
import os
|
||||
from typing import Any, ClassVar, cast
|
||||
from typing import Any, ClassVar
|
||||
import uuid
|
||||
|
||||
from typing_extensions import Self
|
||||
@@ -18,6 +18,32 @@ from crewai.events.listeners.tracing.types import TraceEvent
|
||||
from crewai.events.listeners.tracing.utils import (
|
||||
safe_serialize_to_dict,
|
||||
)
|
||||
from crewai.events.types.a2a_events import (
|
||||
A2AAgentCardFetchedEvent,
|
||||
A2AArtifactReceivedEvent,
|
||||
A2AAuthenticationFailedEvent,
|
||||
A2AConnectionErrorEvent,
|
||||
A2AConversationCompletedEvent,
|
||||
A2AConversationStartedEvent,
|
||||
A2ADelegationCompletedEvent,
|
||||
A2ADelegationStartedEvent,
|
||||
A2AMessageSentEvent,
|
||||
A2AParallelDelegationCompletedEvent,
|
||||
A2AParallelDelegationStartedEvent,
|
||||
A2APollingStartedEvent,
|
||||
A2APollingStatusEvent,
|
||||
A2APushNotificationReceivedEvent,
|
||||
A2APushNotificationRegisteredEvent,
|
||||
A2APushNotificationSentEvent,
|
||||
A2APushNotificationTimeoutEvent,
|
||||
A2AResponseReceivedEvent,
|
||||
A2AServerTaskCanceledEvent,
|
||||
A2AServerTaskCompletedEvent,
|
||||
A2AServerTaskFailedEvent,
|
||||
A2AServerTaskStartedEvent,
|
||||
A2AStreamingChunkEvent,
|
||||
A2AStreamingStartedEvent,
|
||||
)
|
||||
from crewai.events.types.agent_events import (
|
||||
AgentExecutionCompletedEvent,
|
||||
AgentExecutionErrorEvent,
|
||||
@@ -105,7 +131,7 @@ class TraceCollectionListener(BaseEventListener):
|
||||
"""Create or return singleton instance."""
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
return cast(Self, cls._instance)
|
||||
return cls._instance
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -160,6 +186,7 @@ class TraceCollectionListener(BaseEventListener):
|
||||
self._register_flow_event_handlers(crewai_event_bus)
|
||||
self._register_context_event_handlers(crewai_event_bus)
|
||||
self._register_action_event_handlers(crewai_event_bus)
|
||||
self._register_a2a_event_handlers(crewai_event_bus)
|
||||
self._register_system_event_handlers(crewai_event_bus)
|
||||
|
||||
self._listeners_setup = True
|
||||
@@ -439,6 +466,147 @@ class TraceCollectionListener(BaseEventListener):
|
||||
) -> None:
|
||||
self._handle_action_event("knowledge_query_failed", source, event)
|
||||
|
||||
def _register_a2a_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
|
||||
"""Register handlers for A2A (Agent-to-Agent) events."""
|
||||
|
||||
@event_bus.on(A2ADelegationStartedEvent)
|
||||
def on_a2a_delegation_started(
|
||||
source: Any, event: A2ADelegationStartedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_delegation_started", source, event)
|
||||
|
||||
@event_bus.on(A2ADelegationCompletedEvent)
|
||||
def on_a2a_delegation_completed(
|
||||
source: Any, event: A2ADelegationCompletedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_delegation_completed", source, event)
|
||||
|
||||
@event_bus.on(A2AConversationStartedEvent)
|
||||
def on_a2a_conversation_started(
|
||||
source: Any, event: A2AConversationStartedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_conversation_started", source, event)
|
||||
|
||||
@event_bus.on(A2AMessageSentEvent)
|
||||
def on_a2a_message_sent(source: Any, event: A2AMessageSentEvent) -> None:
|
||||
self._handle_action_event("a2a_message_sent", source, event)
|
||||
|
||||
@event_bus.on(A2AResponseReceivedEvent)
|
||||
def on_a2a_response_received(
|
||||
source: Any, event: A2AResponseReceivedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_response_received", source, event)
|
||||
|
||||
@event_bus.on(A2AConversationCompletedEvent)
|
||||
def on_a2a_conversation_completed(
|
||||
source: Any, event: A2AConversationCompletedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_conversation_completed", source, event)
|
||||
|
||||
@event_bus.on(A2APollingStartedEvent)
|
||||
def on_a2a_polling_started(source: Any, event: A2APollingStartedEvent) -> None:
|
||||
self._handle_action_event("a2a_polling_started", source, event)
|
||||
|
||||
@event_bus.on(A2APollingStatusEvent)
|
||||
def on_a2a_polling_status(source: Any, event: A2APollingStatusEvent) -> None:
|
||||
self._handle_action_event("a2a_polling_status", source, event)
|
||||
|
||||
@event_bus.on(A2APushNotificationRegisteredEvent)
|
||||
def on_a2a_push_notification_registered(
|
||||
source: Any, event: A2APushNotificationRegisteredEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_push_notification_registered", source, event)
|
||||
|
||||
@event_bus.on(A2APushNotificationReceivedEvent)
|
||||
def on_a2a_push_notification_received(
|
||||
source: Any, event: A2APushNotificationReceivedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_push_notification_received", source, event)
|
||||
|
||||
@event_bus.on(A2APushNotificationSentEvent)
|
||||
def on_a2a_push_notification_sent(
|
||||
source: Any, event: A2APushNotificationSentEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_push_notification_sent", source, event)
|
||||
|
||||
@event_bus.on(A2APushNotificationTimeoutEvent)
|
||||
def on_a2a_push_notification_timeout(
|
||||
source: Any, event: A2APushNotificationTimeoutEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_push_notification_timeout", source, event)
|
||||
|
||||
@event_bus.on(A2AStreamingStartedEvent)
|
||||
def on_a2a_streaming_started(
|
||||
source: Any, event: A2AStreamingStartedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_streaming_started", source, event)
|
||||
|
||||
@event_bus.on(A2AStreamingChunkEvent)
|
||||
def on_a2a_streaming_chunk(source: Any, event: A2AStreamingChunkEvent) -> None:
|
||||
self._handle_action_event("a2a_streaming_chunk", source, event)
|
||||
|
||||
@event_bus.on(A2AAgentCardFetchedEvent)
|
||||
def on_a2a_agent_card_fetched(
|
||||
source: Any, event: A2AAgentCardFetchedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_agent_card_fetched", source, event)
|
||||
|
||||
@event_bus.on(A2AAuthenticationFailedEvent)
|
||||
def on_a2a_authentication_failed(
|
||||
source: Any, event: A2AAuthenticationFailedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_authentication_failed", source, event)
|
||||
|
||||
@event_bus.on(A2AArtifactReceivedEvent)
|
||||
def on_a2a_artifact_received(
|
||||
source: Any, event: A2AArtifactReceivedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_artifact_received", source, event)
|
||||
|
||||
@event_bus.on(A2AConnectionErrorEvent)
|
||||
def on_a2a_connection_error(
|
||||
source: Any, event: A2AConnectionErrorEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_connection_error", source, event)
|
||||
|
||||
@event_bus.on(A2AServerTaskStartedEvent)
|
||||
def on_a2a_server_task_started(
|
||||
source: Any, event: A2AServerTaskStartedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_server_task_started", source, event)
|
||||
|
||||
@event_bus.on(A2AServerTaskCompletedEvent)
|
||||
def on_a2a_server_task_completed(
|
||||
source: Any, event: A2AServerTaskCompletedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_server_task_completed", source, event)
|
||||
|
||||
@event_bus.on(A2AServerTaskCanceledEvent)
|
||||
def on_a2a_server_task_canceled(
|
||||
source: Any, event: A2AServerTaskCanceledEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_server_task_canceled", source, event)
|
||||
|
||||
@event_bus.on(A2AServerTaskFailedEvent)
|
||||
def on_a2a_server_task_failed(
|
||||
source: Any, event: A2AServerTaskFailedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_server_task_failed", source, event)
|
||||
|
||||
@event_bus.on(A2AParallelDelegationStartedEvent)
|
||||
def on_a2a_parallel_delegation_started(
|
||||
source: Any, event: A2AParallelDelegationStartedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_parallel_delegation_started", source, event)
|
||||
|
||||
@event_bus.on(A2AParallelDelegationCompletedEvent)
|
||||
def on_a2a_parallel_delegation_completed(
|
||||
source: Any, event: A2AParallelDelegationCompletedEvent
|
||||
) -> None:
|
||||
self._handle_action_event(
|
||||
"a2a_parallel_delegation_completed", source, event
|
||||
)
|
||||
|
||||
def _register_system_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
|
||||
"""Register handlers for system signal events (SIGTERM, SIGINT, etc.)."""
|
||||
|
||||
@@ -570,10 +738,15 @@ class TraceCollectionListener(BaseEventListener):
|
||||
if event_type not in self.complex_events:
|
||||
return safe_serialize_to_dict(event)
|
||||
if event_type == "task_started":
|
||||
task_name = event.task.name or event.task.description
|
||||
task_display_name = (
|
||||
task_name[:80] + "..." if len(task_name) > 80 else task_name
|
||||
)
|
||||
return {
|
||||
"task_description": event.task.description,
|
||||
"expected_output": event.task.expected_output,
|
||||
"task_name": event.task.name or event.task.description,
|
||||
"task_name": task_name,
|
||||
"task_display_name": task_display_name,
|
||||
"context": event.context,
|
||||
"agent_role": source.agent.role,
|
||||
"task_id": str(event.task.id),
|
||||
|
||||
@@ -4,68 +4,120 @@ This module defines events emitted during A2A protocol delegation,
|
||||
including both single-turn and multiturn conversation flows.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import model_validator
|
||||
|
||||
from crewai.events.base_events import BaseEvent
|
||||
|
||||
|
||||
class A2AEventBase(BaseEvent):
|
||||
"""Base class for A2A events with task/agent context."""
|
||||
|
||||
from_task: Any | None = None
|
||||
from_agent: Any | None = None
|
||||
from_task: Any = None
|
||||
from_agent: Any = None
|
||||
|
||||
def __init__(self, **data: Any) -> None:
|
||||
"""Initialize A2A event, extracting task and agent metadata."""
|
||||
if data.get("from_task"):
|
||||
task = data["from_task"]
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def extract_task_and_agent_metadata(cls, data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Extract task and agent metadata before validation."""
|
||||
if task := data.get("from_task"):
|
||||
data["task_id"] = str(task.id)
|
||||
data["task_name"] = task.name or task.description
|
||||
data.setdefault("source_fingerprint", str(task.id))
|
||||
data.setdefault("source_type", "task")
|
||||
data.setdefault(
|
||||
"fingerprint_metadata",
|
||||
{
|
||||
"task_id": str(task.id),
|
||||
"task_name": task.name or task.description,
|
||||
},
|
||||
)
|
||||
data["from_task"] = None
|
||||
|
||||
if data.get("from_agent"):
|
||||
agent = data["from_agent"]
|
||||
if agent := data.get("from_agent"):
|
||||
data["agent_id"] = str(agent.id)
|
||||
data["agent_role"] = agent.role
|
||||
data.setdefault("source_fingerprint", str(agent.id))
|
||||
data.setdefault("source_type", "agent")
|
||||
data.setdefault(
|
||||
"fingerprint_metadata",
|
||||
{
|
||||
"agent_id": str(agent.id),
|
||||
"agent_role": agent.role,
|
||||
},
|
||||
)
|
||||
data["from_agent"] = None
|
||||
|
||||
super().__init__(**data)
|
||||
return data
|
||||
|
||||
|
||||
class A2ADelegationStartedEvent(A2AEventBase):
|
||||
"""Event emitted when A2A delegation starts.
|
||||
|
||||
Attributes:
|
||||
endpoint: A2A agent endpoint URL (AgentCard URL)
|
||||
task_description: Task being delegated to the A2A agent
|
||||
agent_id: A2A agent identifier
|
||||
is_multiturn: Whether this is part of a multiturn conversation
|
||||
turn_number: Current turn number (1-indexed, 1 for single-turn)
|
||||
endpoint: A2A agent endpoint URL (AgentCard URL).
|
||||
task_description: Task being delegated to the A2A agent.
|
||||
agent_id: A2A agent identifier.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
is_multiturn: Whether this is part of a multiturn conversation.
|
||||
turn_number: Current turn number (1-indexed, 1 for single-turn).
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
agent_card: Full A2A agent card metadata.
|
||||
protocol_version: A2A protocol version being used.
|
||||
provider: Agent provider/organization info from agent card.
|
||||
skill_id: ID of the specific skill being invoked.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
extensions: List of A2A extension URIs in use.
|
||||
"""
|
||||
|
||||
type: str = "a2a_delegation_started"
|
||||
endpoint: str
|
||||
task_description: str
|
||||
agent_id: str
|
||||
context_id: str | None = None
|
||||
is_multiturn: bool = False
|
||||
turn_number: int = 1
|
||||
a2a_agent_name: str | None = None
|
||||
agent_card: dict[str, Any] | None = None
|
||||
protocol_version: str | None = None
|
||||
provider: dict[str, Any] | None = None
|
||||
skill_id: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
extensions: list[str] | None = None
|
||||
|
||||
|
||||
class A2ADelegationCompletedEvent(A2AEventBase):
|
||||
"""Event emitted when A2A delegation completes.
|
||||
|
||||
Attributes:
|
||||
status: Completion status (completed, input_required, failed, etc.)
|
||||
result: Result message if status is completed
|
||||
error: Error/response message (error for failed, response for input_required)
|
||||
is_multiturn: Whether this is part of a multiturn conversation
|
||||
status: Completion status (completed, input_required, failed, etc.).
|
||||
result: Result message if status is completed.
|
||||
error: Error/response message (error for failed, response for input_required).
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
is_multiturn: Whether this is part of a multiturn conversation.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
agent_card: Full A2A agent card metadata.
|
||||
provider: Agent provider/organization info from agent card.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
extensions: List of A2A extension URIs in use.
|
||||
"""
|
||||
|
||||
type: str = "a2a_delegation_completed"
|
||||
status: str
|
||||
result: str | None = None
|
||||
error: str | None = None
|
||||
context_id: str | None = None
|
||||
is_multiturn: bool = False
|
||||
endpoint: str | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
agent_card: dict[str, Any] | None = None
|
||||
provider: dict[str, Any] | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
extensions: list[str] | None = None
|
||||
|
||||
|
||||
class A2AConversationStartedEvent(A2AEventBase):
|
||||
@@ -75,51 +127,95 @@ class A2AConversationStartedEvent(A2AEventBase):
|
||||
before the first message exchange.
|
||||
|
||||
Attributes:
|
||||
agent_id: A2A agent identifier
|
||||
endpoint: A2A agent endpoint URL
|
||||
a2a_agent_name: Name of the A2A agent from agent card
|
||||
agent_id: A2A agent identifier.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
agent_card: Full A2A agent card metadata.
|
||||
protocol_version: A2A protocol version being used.
|
||||
provider: Agent provider/organization info from agent card.
|
||||
skill_id: ID of the specific skill being invoked.
|
||||
reference_task_ids: Related task IDs for context.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
extensions: List of A2A extension URIs in use.
|
||||
"""
|
||||
|
||||
type: str = "a2a_conversation_started"
|
||||
agent_id: str
|
||||
endpoint: str
|
||||
context_id: str | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
agent_card: dict[str, Any] | None = None
|
||||
protocol_version: str | None = None
|
||||
provider: dict[str, Any] | None = None
|
||||
skill_id: str | None = None
|
||||
reference_task_ids: list[str] | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
extensions: list[str] | None = None
|
||||
|
||||
|
||||
class A2AMessageSentEvent(A2AEventBase):
|
||||
"""Event emitted when a message is sent to the A2A agent.
|
||||
|
||||
Attributes:
|
||||
message: Message content sent to the A2A agent
|
||||
turn_number: Current turn number (1-indexed)
|
||||
is_multiturn: Whether this is part of a multiturn conversation
|
||||
agent_role: Role of the CrewAI agent sending the message
|
||||
message: Message content sent to the A2A agent.
|
||||
turn_number: Current turn number (1-indexed).
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
message_id: Unique A2A message identifier.
|
||||
is_multiturn: Whether this is part of a multiturn conversation.
|
||||
agent_role: Role of the CrewAI agent sending the message.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
skill_id: ID of the specific skill being invoked.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
extensions: List of A2A extension URIs in use.
|
||||
"""
|
||||
|
||||
type: str = "a2a_message_sent"
|
||||
message: str
|
||||
turn_number: int
|
||||
context_id: str | None = None
|
||||
message_id: str | None = None
|
||||
is_multiturn: bool = False
|
||||
agent_role: str | None = None
|
||||
endpoint: str | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
skill_id: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
extensions: list[str] | None = None
|
||||
|
||||
|
||||
class A2AResponseReceivedEvent(A2AEventBase):
|
||||
"""Event emitted when a response is received from the A2A agent.
|
||||
|
||||
Attributes:
|
||||
response: Response content from the A2A agent
|
||||
turn_number: Current turn number (1-indexed)
|
||||
is_multiturn: Whether this is part of a multiturn conversation
|
||||
status: Response status (input_required, completed, etc.)
|
||||
agent_role: Role of the CrewAI agent (for display)
|
||||
response: Response content from the A2A agent.
|
||||
turn_number: Current turn number (1-indexed).
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
message_id: Unique A2A message identifier.
|
||||
is_multiturn: Whether this is part of a multiturn conversation.
|
||||
status: Response status (input_required, completed, etc.).
|
||||
final: Whether this is the final response in the stream.
|
||||
agent_role: Role of the CrewAI agent (for display).
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
extensions: List of A2A extension URIs in use.
|
||||
"""
|
||||
|
||||
type: str = "a2a_response_received"
|
||||
response: str
|
||||
turn_number: int
|
||||
context_id: str | None = None
|
||||
message_id: str | None = None
|
||||
is_multiturn: bool = False
|
||||
status: str
|
||||
final: bool = False
|
||||
agent_role: str | None = None
|
||||
endpoint: str | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
extensions: list[str] | None = None
|
||||
|
||||
|
||||
class A2AConversationCompletedEvent(A2AEventBase):
|
||||
@@ -128,119 +224,433 @@ class A2AConversationCompletedEvent(A2AEventBase):
|
||||
This is emitted once at the end of a multiturn conversation.
|
||||
|
||||
Attributes:
|
||||
status: Final status (completed, failed, etc.)
|
||||
final_result: Final result if completed successfully
|
||||
error: Error message if failed
|
||||
total_turns: Total number of turns in the conversation
|
||||
status: Final status (completed, failed, etc.).
|
||||
final_result: Final result if completed successfully.
|
||||
error: Error message if failed.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
total_turns: Total number of turns in the conversation.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
agent_card: Full A2A agent card metadata.
|
||||
reference_task_ids: Related task IDs for context.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
extensions: List of A2A extension URIs in use.
|
||||
"""
|
||||
|
||||
type: str = "a2a_conversation_completed"
|
||||
status: Literal["completed", "failed"]
|
||||
final_result: str | None = None
|
||||
error: str | None = None
|
||||
context_id: str | None = None
|
||||
total_turns: int
|
||||
endpoint: str | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
agent_card: dict[str, Any] | None = None
|
||||
reference_task_ids: list[str] | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
extensions: list[str] | None = None
|
||||
|
||||
|
||||
class A2APollingStartedEvent(A2AEventBase):
|
||||
"""Event emitted when polling mode begins for A2A delegation.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID being polled
|
||||
polling_interval: Seconds between poll attempts
|
||||
endpoint: A2A agent endpoint URL
|
||||
task_id: A2A task ID being polled.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
polling_interval: Seconds between poll attempts.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_polling_started"
|
||||
task_id: str
|
||||
context_id: str | None = None
|
||||
polling_interval: float
|
||||
endpoint: str
|
||||
a2a_agent_name: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2APollingStatusEvent(A2AEventBase):
|
||||
"""Event emitted on each polling iteration.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID being polled
|
||||
state: Current task state from remote agent
|
||||
elapsed_seconds: Time since polling started
|
||||
poll_count: Number of polls completed
|
||||
task_id: A2A task ID being polled.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
state: Current task state from remote agent.
|
||||
elapsed_seconds: Time since polling started.
|
||||
poll_count: Number of polls completed.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_polling_status"
|
||||
task_id: str
|
||||
context_id: str | None = None
|
||||
state: str
|
||||
elapsed_seconds: float
|
||||
poll_count: int
|
||||
endpoint: str | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2APushNotificationRegisteredEvent(A2AEventBase):
|
||||
"""Event emitted when push notification callback is registered.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID for which callback is registered
|
||||
callback_url: URL where agent will send push notifications
|
||||
task_id: A2A task ID for which callback is registered.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
callback_url: URL where agent will send push notifications.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_push_notification_registered"
|
||||
task_id: str
|
||||
context_id: str | None = None
|
||||
callback_url: str
|
||||
endpoint: str | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2APushNotificationReceivedEvent(A2AEventBase):
|
||||
"""Event emitted when a push notification is received.
|
||||
|
||||
This event should be emitted by the user's webhook handler when it receives
|
||||
a push notification from the remote A2A agent, before calling
|
||||
`result_store.store_result()`.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID from the notification
|
||||
state: Current task state from the notification
|
||||
task_id: A2A task ID from the notification.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
state: Current task state from the notification.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_push_notification_received"
|
||||
task_id: str
|
||||
context_id: str | None = None
|
||||
state: str
|
||||
endpoint: str | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2APushNotificationSentEvent(A2AEventBase):
|
||||
"""Event emitted when a push notification is sent to a callback URL.
|
||||
|
||||
Emitted by the A2A server when it sends a task status update to the
|
||||
client's registered push notification callback URL.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID being notified.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
callback_url: URL the notification was sent to.
|
||||
state: Task state being reported.
|
||||
success: Whether the notification was successfully delivered.
|
||||
error: Error message if delivery failed.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_push_notification_sent"
|
||||
task_id: str
|
||||
context_id: str | None = None
|
||||
callback_url: str
|
||||
state: str
|
||||
success: bool = True
|
||||
error: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2APushNotificationTimeoutEvent(A2AEventBase):
|
||||
"""Event emitted when push notification wait times out.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID that timed out
|
||||
timeout_seconds: Timeout duration in seconds
|
||||
task_id: A2A task ID that timed out.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
timeout_seconds: Timeout duration in seconds.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_push_notification_timeout"
|
||||
task_id: str
|
||||
context_id: str | None = None
|
||||
timeout_seconds: float
|
||||
endpoint: str | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2AStreamingStartedEvent(A2AEventBase):
|
||||
"""Event emitted when streaming mode begins for A2A delegation.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID for the streaming session.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
turn_number: Current turn number (1-indexed).
|
||||
is_multiturn: Whether this is part of a multiturn conversation.
|
||||
agent_role: Role of the CrewAI agent.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
extensions: List of A2A extension URIs in use.
|
||||
"""
|
||||
|
||||
type: str = "a2a_streaming_started"
|
||||
task_id: str | None = None
|
||||
context_id: str | None = None
|
||||
endpoint: str
|
||||
a2a_agent_name: str | None = None
|
||||
turn_number: int = 1
|
||||
is_multiturn: bool = False
|
||||
agent_role: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
extensions: list[str] | None = None
|
||||
|
||||
|
||||
class A2AStreamingChunkEvent(A2AEventBase):
|
||||
"""Event emitted when a streaming chunk is received.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID for the streaming session.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
chunk: The text content of the chunk.
|
||||
chunk_index: Index of this chunk in the stream (0-indexed).
|
||||
final: Whether this is the final chunk in the stream.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
turn_number: Current turn number (1-indexed).
|
||||
is_multiturn: Whether this is part of a multiturn conversation.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
extensions: List of A2A extension URIs in use.
|
||||
"""
|
||||
|
||||
type: str = "a2a_streaming_chunk"
|
||||
task_id: str | None = None
|
||||
context_id: str | None = None
|
||||
chunk: str
|
||||
chunk_index: int
|
||||
final: bool = False
|
||||
endpoint: str | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
turn_number: int = 1
|
||||
is_multiturn: bool = False
|
||||
metadata: dict[str, Any] | None = None
|
||||
extensions: list[str] | None = None
|
||||
|
||||
|
||||
class A2AAgentCardFetchedEvent(A2AEventBase):
|
||||
"""Event emitted when an agent card is successfully fetched.
|
||||
|
||||
Attributes:
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
agent_card: Full A2A agent card metadata.
|
||||
protocol_version: A2A protocol version from agent card.
|
||||
provider: Agent provider/organization info from agent card.
|
||||
cached: Whether the agent card was retrieved from cache.
|
||||
fetch_time_ms: Time taken to fetch the agent card in milliseconds.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_agent_card_fetched"
|
||||
endpoint: str
|
||||
a2a_agent_name: str | None = None
|
||||
agent_card: dict[str, Any] | None = None
|
||||
protocol_version: str | None = None
|
||||
provider: dict[str, Any] | None = None
|
||||
cached: bool = False
|
||||
fetch_time_ms: float | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2AAuthenticationFailedEvent(A2AEventBase):
|
||||
"""Event emitted when authentication to an A2A agent fails.
|
||||
|
||||
Attributes:
|
||||
endpoint: A2A agent endpoint URL.
|
||||
auth_type: Type of authentication attempted (e.g., bearer, oauth2, api_key).
|
||||
error: Error message describing the failure.
|
||||
status_code: HTTP status code if applicable.
|
||||
a2a_agent_name: Name of the A2A agent if known.
|
||||
protocol_version: A2A protocol version being used.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_authentication_failed"
|
||||
endpoint: str
|
||||
auth_type: str | None = None
|
||||
error: str
|
||||
status_code: int | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
protocol_version: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2AArtifactReceivedEvent(A2AEventBase):
|
||||
"""Event emitted when an artifact is received from a remote A2A agent.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID the artifact belongs to.
|
||||
artifact_id: Unique identifier for the artifact.
|
||||
artifact_name: Name of the artifact.
|
||||
artifact_description: Purpose description of the artifact.
|
||||
mime_type: MIME type of the artifact content.
|
||||
size_bytes: Size of the artifact in bytes.
|
||||
append: Whether content should be appended to existing artifact.
|
||||
last_chunk: Whether this is the final chunk of the artifact.
|
||||
endpoint: A2A agent endpoint URL.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
context_id: Context ID for correlation.
|
||||
turn_number: Current turn number (1-indexed).
|
||||
is_multiturn: Whether this is part of a multiturn conversation.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
extensions: List of A2A extension URIs in use.
|
||||
"""
|
||||
|
||||
type: str = "a2a_artifact_received"
|
||||
task_id: str
|
||||
artifact_id: str
|
||||
artifact_name: str | None = None
|
||||
artifact_description: str | None = None
|
||||
mime_type: str | None = None
|
||||
size_bytes: int | None = None
|
||||
append: bool = False
|
||||
last_chunk: bool = False
|
||||
endpoint: str | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
context_id: str | None = None
|
||||
turn_number: int = 1
|
||||
is_multiturn: bool = False
|
||||
metadata: dict[str, Any] | None = None
|
||||
extensions: list[str] | None = None
|
||||
|
||||
|
||||
class A2AConnectionErrorEvent(A2AEventBase):
|
||||
"""Event emitted when a connection error occurs during A2A communication.
|
||||
|
||||
Attributes:
|
||||
endpoint: A2A agent endpoint URL.
|
||||
error: Error message describing the connection failure.
|
||||
error_type: Type of error (e.g., timeout, connection_refused, dns_error).
|
||||
status_code: HTTP status code if applicable.
|
||||
a2a_agent_name: Name of the A2A agent from agent card.
|
||||
operation: The operation being attempted when error occurred.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
task_id: A2A task ID if applicable.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_connection_error"
|
||||
endpoint: str
|
||||
error: str
|
||||
error_type: str | None = None
|
||||
status_code: int | None = None
|
||||
a2a_agent_name: str | None = None
|
||||
operation: str | None = None
|
||||
context_id: str | None = None
|
||||
task_id: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2AServerTaskStartedEvent(A2AEventBase):
|
||||
"""Event emitted when an A2A server task execution starts."""
|
||||
"""Event emitted when an A2A server task execution starts.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID for this execution.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_server_task_started"
|
||||
a2a_task_id: str
|
||||
a2a_context_id: str
|
||||
task_id: str
|
||||
context_id: str
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2AServerTaskCompletedEvent(A2AEventBase):
|
||||
"""Event emitted when an A2A server task execution completes."""
|
||||
"""Event emitted when an A2A server task execution completes.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID for this execution.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
result: The task result.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_server_task_completed"
|
||||
a2a_task_id: str
|
||||
a2a_context_id: str
|
||||
task_id: str
|
||||
context_id: str
|
||||
result: str
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2AServerTaskCanceledEvent(A2AEventBase):
|
||||
"""Event emitted when an A2A server task execution is canceled."""
|
||||
"""Event emitted when an A2A server task execution is canceled.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID for this execution.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_server_task_canceled"
|
||||
a2a_task_id: str
|
||||
a2a_context_id: str
|
||||
task_id: str
|
||||
context_id: str
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2AServerTaskFailedEvent(A2AEventBase):
|
||||
"""Event emitted when an A2A server task execution fails."""
|
||||
"""Event emitted when an A2A server task execution fails.
|
||||
|
||||
Attributes:
|
||||
task_id: A2A task ID for this execution.
|
||||
context_id: A2A context ID grouping related tasks.
|
||||
error: Error message describing the failure.
|
||||
metadata: Custom A2A metadata key-value pairs.
|
||||
"""
|
||||
|
||||
type: str = "a2a_server_task_failed"
|
||||
a2a_task_id: str
|
||||
a2a_context_id: str
|
||||
task_id: str
|
||||
context_id: str
|
||||
error: str
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class A2AParallelDelegationStartedEvent(A2AEventBase):
|
||||
"""Event emitted when parallel delegation to multiple A2A agents begins.
|
||||
|
||||
Attributes:
|
||||
endpoints: List of A2A agent endpoints being delegated to.
|
||||
task_description: Description of the task being delegated.
|
||||
"""
|
||||
|
||||
type: str = "a2a_parallel_delegation_started"
|
||||
endpoints: list[str]
|
||||
task_description: str
|
||||
|
||||
|
||||
class A2AParallelDelegationCompletedEvent(A2AEventBase):
|
||||
"""Event emitted when parallel delegation to multiple A2A agents completes.
|
||||
|
||||
Attributes:
|
||||
endpoints: List of A2A agent endpoints that were delegated to.
|
||||
success_count: Number of successful delegations.
|
||||
failure_count: Number of failed delegations.
|
||||
results: Summary of results from each agent.
|
||||
"""
|
||||
|
||||
type: str = "a2a_parallel_delegation_completed"
|
||||
endpoints: list[str]
|
||||
success_count: int
|
||||
failure_count: int
|
||||
results: dict[str, str] | None = None
|
||||
|
||||
@@ -533,7 +533,6 @@ class LiteAgent(FlowTrackable, BaseModel):
|
||||
"""
|
||||
# Execute the agent loop
|
||||
formatted_answer: AgentAction | AgentFinish | None = None
|
||||
last_raw_output: str | None = None
|
||||
while not isinstance(formatted_answer, AgentFinish):
|
||||
try:
|
||||
if has_reached_max_iterations(self._iterations, self.max_iterations):
|
||||
@@ -557,7 +556,6 @@ class LiteAgent(FlowTrackable, BaseModel):
|
||||
from_agent=self,
|
||||
executor_context=self,
|
||||
)
|
||||
last_raw_output = answer
|
||||
|
||||
except Exception as e:
|
||||
raise e
|
||||
@@ -596,8 +594,6 @@ class LiteAgent(FlowTrackable, BaseModel):
|
||||
iterations=self._iterations,
|
||||
log_error_after=3,
|
||||
printer=self._printer,
|
||||
raw_output=last_raw_output,
|
||||
agent_role=self.role,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -2,7 +2,6 @@ from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable, Sequence
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from typing import TYPE_CHECKING, Any, Final, Literal, TypedDict
|
||||
|
||||
@@ -52,8 +51,6 @@ class SummaryContent(TypedDict):
|
||||
|
||||
console = Console()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_MULTIPLE_NEWLINES: Final[re.Pattern[str]] = re.compile(r"\n+")
|
||||
|
||||
|
||||
@@ -433,8 +430,6 @@ def handle_output_parser_exception(
|
||||
iterations: int,
|
||||
log_error_after: int = 3,
|
||||
printer: Printer | None = None,
|
||||
raw_output: str | None = None,
|
||||
agent_role: str | None = None,
|
||||
) -> AgentAction:
|
||||
"""Handle OutputParserError by updating messages and formatted_answer.
|
||||
|
||||
@@ -444,8 +439,6 @@ def handle_output_parser_exception(
|
||||
iterations: Current iteration count
|
||||
log_error_after: Number of iterations after which to log errors
|
||||
printer: Optional printer instance for logging
|
||||
raw_output: The raw LLM output that failed to parse
|
||||
agent_role: The role of the agent for logging context
|
||||
|
||||
Returns:
|
||||
AgentAction: A formatted answer with the error
|
||||
@@ -459,27 +452,6 @@ def handle_output_parser_exception(
|
||||
thought="",
|
||||
)
|
||||
|
||||
retry_count = iterations + 1
|
||||
agent_context = f" for agent '{agent_role}'" if agent_role else ""
|
||||
|
||||
logger.debug(
|
||||
"Parse failed%s: %s",
|
||||
agent_context,
|
||||
e.error.split("\n")[0],
|
||||
)
|
||||
|
||||
if raw_output is not None:
|
||||
truncated_output = (
|
||||
raw_output[:500] + "..." if len(raw_output) > 500 else raw_output
|
||||
)
|
||||
logger.debug(
|
||||
"Raw output (truncated)%s: %s",
|
||||
agent_context,
|
||||
truncated_output.replace("\n", "\\n"),
|
||||
)
|
||||
|
||||
logger.debug("Retry %d initiated%s", retry_count, agent_context)
|
||||
|
||||
if iterations > log_error_after and printer:
|
||||
printer.print(
|
||||
content=f"Error parsing LLM output, agent will retry: {e.error}",
|
||||
|
||||
@@ -26,9 +26,13 @@ def mock_agent() -> MagicMock:
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_task() -> MagicMock:
|
||||
def mock_task(mock_context: MagicMock) -> MagicMock:
|
||||
"""Create a mock Task."""
|
||||
return MagicMock()
|
||||
task = MagicMock()
|
||||
task.id = mock_context.task_id
|
||||
task.name = "Mock Task"
|
||||
task.description = "Mock task description"
|
||||
return task
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -179,8 +183,8 @@ class TestExecute:
|
||||
event = first_call[0][1]
|
||||
|
||||
assert event.type == "a2a_server_task_started"
|
||||
assert event.a2a_task_id == mock_context.task_id
|
||||
assert event.a2a_context_id == mock_context.context_id
|
||||
assert event.task_id == mock_context.task_id
|
||||
assert event.context_id == mock_context.context_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_emits_completed_event(
|
||||
@@ -201,7 +205,7 @@ class TestExecute:
|
||||
event = second_call[0][1]
|
||||
|
||||
assert event.type == "a2a_server_task_completed"
|
||||
assert event.a2a_task_id == mock_context.task_id
|
||||
assert event.task_id == mock_context.task_id
|
||||
assert event.result == "Task completed successfully"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -250,7 +254,7 @@ class TestExecute:
|
||||
event = canceled_call[0][1]
|
||||
|
||||
assert event.type == "a2a_server_task_canceled"
|
||||
assert event.a2a_task_id == mock_context.task_id
|
||||
assert event.task_id == mock_context.task_id
|
||||
|
||||
|
||||
class TestCancel:
|
||||
|
||||
@@ -14,6 +14,16 @@ except ImportError:
|
||||
A2A_SDK_INSTALLED = False
|
||||
|
||||
|
||||
def _create_mock_agent_card(name: str = "Test", url: str = "http://test-endpoint.com/"):
|
||||
"""Create a mock agent card with proper model_dump behavior."""
|
||||
mock_card = MagicMock()
|
||||
mock_card.name = name
|
||||
mock_card.url = url
|
||||
mock_card.model_dump.return_value = {"name": name, "url": url}
|
||||
mock_card.model_dump_json.return_value = f'{{"name": "{name}", "url": "{url}"}}'
|
||||
return mock_card
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
|
||||
def test_trust_remote_completion_status_true_returns_directly():
|
||||
"""When trust_remote_completion_status=True and A2A returns completed, return result directly."""
|
||||
@@ -44,8 +54,7 @@ def test_trust_remote_completion_status_true_returns_directly():
|
||||
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
|
||||
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
|
||||
):
|
||||
mock_card = MagicMock()
|
||||
mock_card.name = "Test"
|
||||
mock_card = _create_mock_agent_card()
|
||||
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
|
||||
|
||||
# A2A returns completed
|
||||
@@ -110,8 +119,7 @@ def test_trust_remote_completion_status_false_continues_conversation():
|
||||
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
|
||||
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
|
||||
):
|
||||
mock_card = MagicMock()
|
||||
mock_card.name = "Test"
|
||||
mock_card = _create_mock_agent_card()
|
||||
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
|
||||
|
||||
# A2A returns completed
|
||||
|
||||
@@ -1,240 +0,0 @@
|
||||
"""Tests for agent_utils module, specifically debug logging for OutputParserError."""
|
||||
|
||||
import logging
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.agents.parser import AgentAction, OutputParserError
|
||||
from crewai.utilities.agent_utils import handle_output_parser_exception
|
||||
|
||||
|
||||
class TestHandleOutputParserExceptionDebugLogging:
|
||||
"""Tests for debug logging in handle_output_parser_exception."""
|
||||
|
||||
def test_debug_logging_with_raw_output_and_agent_role(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test that debug logging includes raw output and agent role when provided."""
|
||||
error = OutputParserError("Invalid Format: I missed the 'Action:' after 'Thought:'.")
|
||||
messages: list[dict[str, str]] = []
|
||||
raw_output = "Let me think about this... The answer is..."
|
||||
agent_role = "Researcher"
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
result = handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=0,
|
||||
raw_output=raw_output,
|
||||
agent_role=agent_role,
|
||||
)
|
||||
|
||||
assert isinstance(result, AgentAction)
|
||||
assert "Parse failed for agent 'Researcher'" in caplog.text
|
||||
assert "Raw output (truncated) for agent 'Researcher'" in caplog.text
|
||||
assert "Let me think about this... The answer is..." in caplog.text
|
||||
assert "Retry 1 initiated for agent 'Researcher'" in caplog.text
|
||||
|
||||
def test_debug_logging_without_agent_role(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test that debug logging works without agent role."""
|
||||
error = OutputParserError("Invalid Format: I missed the 'Action:' after 'Thought:'.")
|
||||
messages: list[dict[str, str]] = []
|
||||
raw_output = "Some raw output"
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
result = handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=0,
|
||||
raw_output=raw_output,
|
||||
)
|
||||
|
||||
assert isinstance(result, AgentAction)
|
||||
assert "Parse failed:" in caplog.text
|
||||
assert "for agent" not in caplog.text.split("Parse failed:")[1].split("\n")[0]
|
||||
assert "Raw output (truncated):" in caplog.text
|
||||
assert "Retry 1 initiated" in caplog.text
|
||||
|
||||
def test_debug_logging_without_raw_output(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test that debug logging works without raw output."""
|
||||
error = OutputParserError("Invalid Format: I missed the 'Action:' after 'Thought:'.")
|
||||
messages: list[dict[str, str]] = []
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
result = handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=0,
|
||||
agent_role="Researcher",
|
||||
)
|
||||
|
||||
assert isinstance(result, AgentAction)
|
||||
assert "Parse failed for agent 'Researcher'" in caplog.text
|
||||
assert "Raw output (truncated)" not in caplog.text
|
||||
assert "Retry 1 initiated for agent 'Researcher'" in caplog.text
|
||||
|
||||
def test_debug_logging_truncates_long_raw_output(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test that raw output is truncated when longer than 500 characters."""
|
||||
error = OutputParserError("Invalid Format")
|
||||
messages: list[dict[str, str]] = []
|
||||
long_output = "A" * 600
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=0,
|
||||
raw_output=long_output,
|
||||
agent_role="Researcher",
|
||||
)
|
||||
|
||||
assert "A" * 500 + "..." in caplog.text
|
||||
assert "A" * 600 not in caplog.text
|
||||
|
||||
def test_debug_logging_does_not_truncate_short_raw_output(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test that short raw output is not truncated."""
|
||||
error = OutputParserError("Invalid Format")
|
||||
messages: list[dict[str, str]] = []
|
||||
short_output = "Short output"
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=0,
|
||||
raw_output=short_output,
|
||||
agent_role="Researcher",
|
||||
)
|
||||
|
||||
assert "Short output" in caplog.text
|
||||
assert "..." not in caplog.text.split("Short output")[1].split("\n")[0]
|
||||
|
||||
def test_debug_logging_retry_count_increments(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test that retry count is correctly calculated from iterations."""
|
||||
error = OutputParserError("Invalid Format")
|
||||
messages: list[dict[str, str]] = []
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=4,
|
||||
raw_output="test",
|
||||
agent_role="Researcher",
|
||||
)
|
||||
|
||||
assert "Retry 5 initiated" in caplog.text
|
||||
|
||||
def test_debug_logging_escapes_newlines_in_raw_output(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test that newlines in raw output are escaped for readability."""
|
||||
error = OutputParserError("Invalid Format")
|
||||
messages: list[dict[str, str]] = []
|
||||
output_with_newlines = "Line 1\nLine 2\nLine 3"
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=0,
|
||||
raw_output=output_with_newlines,
|
||||
agent_role="Researcher",
|
||||
)
|
||||
|
||||
assert "Line 1\\nLine 2\\nLine 3" in caplog.text
|
||||
|
||||
def test_debug_logging_extracts_first_line_of_error(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test that only the first line of the error message is logged."""
|
||||
error = OutputParserError("First line of error\nSecond line\nThird line")
|
||||
messages: list[dict[str, str]] = []
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=0,
|
||||
agent_role="Researcher",
|
||||
)
|
||||
|
||||
assert "First line of error" in caplog.text
|
||||
parse_failed_line = [line for line in caplog.text.split("\n") if "Parse failed" in line][0]
|
||||
assert "Second line" not in parse_failed_line
|
||||
|
||||
def test_messages_updated_with_error(self) -> None:
|
||||
"""Test that messages list is updated with the error."""
|
||||
error = OutputParserError("Test error message")
|
||||
messages: list[dict[str, str]] = []
|
||||
|
||||
handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=0,
|
||||
)
|
||||
|
||||
assert len(messages) == 1
|
||||
assert messages[0]["role"] == "user"
|
||||
assert messages[0]["content"] == "Test error message"
|
||||
|
||||
def test_returns_agent_action_with_error_text(self) -> None:
|
||||
"""Test that the function returns an AgentAction with the error text."""
|
||||
error = OutputParserError("Test error message")
|
||||
messages: list[dict[str, str]] = []
|
||||
|
||||
result = handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=0,
|
||||
)
|
||||
|
||||
assert isinstance(result, AgentAction)
|
||||
assert result.text == "Test error message"
|
||||
assert result.tool == ""
|
||||
assert result.tool_input == ""
|
||||
assert result.thought == ""
|
||||
|
||||
def test_printer_logs_after_log_error_after_iterations(self) -> None:
|
||||
"""Test that printer logs error after log_error_after iterations."""
|
||||
error = OutputParserError("Test error")
|
||||
messages: list[dict[str, str]] = []
|
||||
printer = MagicMock()
|
||||
|
||||
handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=4,
|
||||
log_error_after=3,
|
||||
printer=printer,
|
||||
)
|
||||
|
||||
printer.print.assert_called_once()
|
||||
call_args = printer.print.call_args
|
||||
assert "Error parsing LLM output" in call_args.kwargs["content"]
|
||||
assert call_args.kwargs["color"] == "red"
|
||||
|
||||
def test_printer_does_not_log_before_log_error_after_iterations(self) -> None:
|
||||
"""Test that printer does not log before log_error_after iterations."""
|
||||
error = OutputParserError("Test error")
|
||||
messages: list[dict[str, str]] = []
|
||||
printer = MagicMock()
|
||||
|
||||
handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=2,
|
||||
log_error_after=3,
|
||||
printer=printer,
|
||||
)
|
||||
|
||||
printer.print.assert_not_called()
|
||||
|
||||
def test_backward_compatibility_without_new_parameters(self) -> None:
|
||||
"""Test that the function works without the new optional parameters."""
|
||||
error = OutputParserError("Test error")
|
||||
messages: list[dict[str, str]] = []
|
||||
|
||||
result = handle_output_parser_exception(
|
||||
e=error,
|
||||
messages=messages,
|
||||
iterations=0,
|
||||
)
|
||||
|
||||
assert isinstance(result, AgentAction)
|
||||
assert len(messages) == 1
|
||||
Reference in New Issue
Block a user