Compare commits

..

4 Commits

Author SHA1 Message Date
lorenzejay
e83b7554bf docs translations 2026-01-15 14:43:43 -08:00
lorenzejay
7834b07ce4 Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/fix-google-vertex-api-using-api-keys 2026-01-15 14:37:37 -08:00
lorenzejay
a9bb03ffa8 docs update here 2026-01-15 14:37:16 -08:00
lorenzejay
5beaea189b supporting vertex through api key use - expo mode 2026-01-15 14:34:07 -08:00
20 changed files with 440 additions and 1811 deletions

View File

@@ -375,10 +375,13 @@ In this section, you'll find detailed examples that help you select, configure,
GOOGLE_API_KEY=<your-api-key>
GEMINI_API_KEY=<your-api-key>
# Optional - for Vertex AI
# For Vertex AI Express mode (API key authentication)
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
# For Vertex AI with service account
GOOGLE_CLOUD_PROJECT=<your-project-id>
GOOGLE_CLOUD_LOCATION=<location> # Defaults to us-central1
GOOGLE_GENAI_USE_VERTEXAI=true # Set to use Vertex AI
```
**Basic Usage:**
@@ -412,7 +415,35 @@ In this section, you'll find detailed examples that help you select, configure,
)
```
**Vertex AI Configuration:**
**Vertex AI Express Mode (API Key Authentication):**
Vertex AI Express mode allows you to use Vertex AI with simple API key authentication instead of service account credentials. This is the quickest way to get started with Vertex AI.
To enable Express mode, set both environment variables in your `.env` file:
```toml .env
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
```
Then use the LLM as usual:
```python Code
from crewai import LLM
llm = LLM(
model="gemini/gemini-2.0-flash",
temperature=0.7
)
```
<Info>
To get an Express mode API key:
- New Google Cloud users: Get an [express mode API key](https://cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey)
- Existing Google Cloud users: Get a [Google Cloud API key bound to a service account](https://cloud.google.com/docs/authentication/api-keys)
For more details, see the [Vertex AI Express mode documentation](https://docs.cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey).
</Info>
**Vertex AI Configuration (Service Account):**
```python Code
from crewai import LLM
@@ -424,10 +455,10 @@ In this section, you'll find detailed examples that help you select, configure,
```
**Supported Environment Variables:**
- `GOOGLE_API_KEY` or `GEMINI_API_KEY`: Your Google API key (required for Gemini API)
- `GOOGLE_CLOUD_PROJECT`: Google Cloud project ID (for Vertex AI)
- `GOOGLE_API_KEY` or `GEMINI_API_KEY`: Your Google API key (required for Gemini API and Vertex AI Express mode)
- `GOOGLE_GENAI_USE_VERTEXAI`: Set to `true` to use Vertex AI (required for Express mode)
- `GOOGLE_CLOUD_PROJECT`: Google Cloud project ID (for Vertex AI with service account)
- `GOOGLE_CLOUD_LOCATION`: GCP location (defaults to `us-central1`)
- `GOOGLE_GENAI_USE_VERTEXAI`: Set to `true` to use Vertex AI
**Features:**
- Native function calling support for Gemini 1.5+ and 2.x models

View File

@@ -107,7 +107,7 @@ CrewAI 코드 내에는 사용할 모델을 지정할 수 있는 여러 위치
## 공급자 구성 예시
CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양한 LLM 공급자를 지원합니다.
CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양한 LLM 공급자를 지원합니다.
이 섹션에서는 프로젝트의 요구에 가장 적합한 LLM을 선택, 구성, 최적화하는 데 도움이 되는 자세한 예시를 제공합니다.
<AccordionGroup>
@@ -153,8 +153,8 @@ CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양
</Accordion>
<Accordion title="Meta-Llama">
Meta의 Llama API는 Meta의 대형 언어 모델 패밀리 접근을 제공합니다.
API는 [Meta Llama API](https://llama.developer.meta.com?utm_source=partner-crewai&utm_medium=website)에서 사용할 수 있습니다.
Meta의 Llama API는 Meta의 대형 언어 모델 패밀리 접근을 제공합니다.
API는 [Meta Llama API](https://llama.developer.meta.com?utm_source=partner-crewai&utm_medium=website)에서 사용할 수 있습니다.
`.env` 파일에 다음 환경 변수를 설정하십시오:
```toml Code
@@ -207,11 +207,20 @@ CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양
`.env` 파일에 API 키를 설정하십시오. 키가 필요하거나 기존 키를 찾으려면 [AI Studio](https://aistudio.google.com/apikey)를 확인하세요.
```toml .env
# https://ai.google.dev/gemini-api/docs/api-key
# Gemini API 사용 시 (다음 중 하나)
GOOGLE_API_KEY=<your-api-key>
GEMINI_API_KEY=<your-api-key>
# Vertex AI Express 모드 사용 시 (API 키 인증)
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
# Vertex AI 서비스 계정 사용 시
GOOGLE_CLOUD_PROJECT=<your-project-id>
GOOGLE_CLOUD_LOCATION=<location> # 기본값: us-central1
```
CrewAI 프로젝트에서의 예시 사용법:
**기본 사용법:**
```python Code
from crewai import LLM
@@ -221,6 +230,34 @@ CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양
)
```
**Vertex AI Express 모드 (API 키 인증):**
Vertex AI Express 모드를 사용하면 서비스 계정 자격 증명 대신 간단한 API 키 인증으로 Vertex AI를 사용할 수 있습니다. Vertex AI를 시작하는 가장 빠른 방법입니다.
Express 모드를 활성화하려면 `.env` 파일에 두 환경 변수를 모두 설정하세요:
```toml .env
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
```
그런 다음 평소처럼 LLM을 사용하세요:
```python Code
from crewai import LLM
llm = LLM(
model="gemini/gemini-2.0-flash",
temperature=0.7
)
```
<Info>
Express 모드 API 키를 받으려면:
- 신규 Google Cloud 사용자: [Express 모드 API 키](https://cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey) 받기
- 기존 Google Cloud 사용자: [서비스 계정에 바인딩된 Google Cloud API 키](https://cloud.google.com/docs/authentication/api-keys) 받기
자세한 내용은 [Vertex AI Express 모드 문서](https://docs.cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey)를 참조하세요.
</Info>
### Gemini 모델
Google은 다양한 용도에 최적화된 강력한 모델을 제공합니다.
@@ -476,7 +513,7 @@ CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양
<Accordion title="Local NVIDIA NIM Deployed using WSL2">
NVIDIA NIM을 이용하면 Windows 기기에서 WSL2(Windows Subsystem for Linux)를 통해 강력한 LLM을 로컬로 실행할 수 있습니다.
NVIDIA NIM을 이용하면 Windows 기기에서 WSL2(Windows Subsystem for Linux)를 통해 강력한 LLM을 로컬로 실행할 수 있습니다.
이 방식은 Nvidia GPU를 활용하여 프라이빗하고, 안전하며, 비용 효율적인 AI 추론을 클라우드 서비스에 의존하지 않고 구현할 수 있습니다.
데이터 프라이버시, 오프라인 기능이 필요한 개발, 테스트, 또는 프로덕션 환경에 최적입니다.
@@ -954,4 +991,4 @@ LLM 설정을 최대한 활용하는 방법을 알아보세요:
llm = LLM(model="openai/gpt-4o") # 128K tokens
```
</Tab>
</Tabs>
</Tabs>

View File

@@ -79,7 +79,7 @@ Existem diferentes locais no código do CrewAI onde você pode especificar o mod
# Configuração avançada com parâmetros detalhados
llm = LLM(
model="openai/gpt-4",
model="openai/gpt-4",
temperature=0.8,
max_tokens=150,
top_p=0.9,
@@ -207,11 +207,20 @@ Nesta seção, você encontrará exemplos detalhados que ajudam a selecionar, co
Defina sua chave de API no seu arquivo `.env`. Se precisar de uma chave, ou encontrar uma existente, verifique o [AI Studio](https://aistudio.google.com/apikey).
```toml .env
# https://ai.google.dev/gemini-api/docs/api-key
# Para API Gemini (uma das seguintes)
GOOGLE_API_KEY=<your-api-key>
GEMINI_API_KEY=<your-api-key>
# Para Vertex AI Express mode (autenticação por chave de API)
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
# Para Vertex AI com conta de serviço
GOOGLE_CLOUD_PROJECT=<your-project-id>
GOOGLE_CLOUD_LOCATION=<location> # Padrão: us-central1
```
Exemplo de uso em seu projeto CrewAI:
**Uso Básico:**
```python Code
from crewai import LLM
@@ -221,6 +230,34 @@ Nesta seção, você encontrará exemplos detalhados que ajudam a selecionar, co
)
```
**Vertex AI Express Mode (Autenticação por Chave de API):**
O Vertex AI Express mode permite usar o Vertex AI com autenticação simples por chave de API, em vez de credenciais de conta de serviço. Esta é a maneira mais rápida de começar com o Vertex AI.
Para habilitar o Express mode, defina ambas as variáveis de ambiente no seu arquivo `.env`:
```toml .env
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
```
Em seguida, use o LLM normalmente:
```python Code
from crewai import LLM
llm = LLM(
model="gemini/gemini-2.0-flash",
temperature=0.7
)
```
<Info>
Para obter uma chave de API do Express mode:
- Novos usuários do Google Cloud: Obtenha uma [chave de API do Express mode](https://cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey)
- Usuários existentes do Google Cloud: Obtenha uma [chave de API do Google Cloud vinculada a uma conta de serviço](https://cloud.google.com/docs/authentication/api-keys)
Para mais detalhes, consulte a [documentação do Vertex AI Express mode](https://docs.cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey).
</Info>
### Modelos Gemini
O Google oferece uma variedade de modelos poderosos otimizados para diferentes casos de uso.
@@ -823,7 +860,7 @@ Saiba como obter o máximo da configuração do seu LLM:
Lembre-se de monitorar regularmente o uso de tokens e ajustar suas configurações para otimizar custos e desempenho.
</Info>
</Accordion>
<Accordion title="Descartar Parâmetros Adicionais">
O CrewAI usa Litellm internamente para chamadas LLM, permitindo descartar parâmetros adicionais desnecessários para seu caso de uso. Isso pode simplificar seu código e reduzir a complexidade da configuração do LLM.
Por exemplo, se não precisar enviar o parâmetro <code>stop</code>, basta omiti-lo na chamada do LLM:
@@ -882,4 +919,4 @@ Saiba como obter o máximo da configuração do seu LLM:
llm = LLM(model="openai/gpt-4o") # 128K tokens
```
</Tab>
</Tabs>
</Tabs>

View File

@@ -3,10 +3,9 @@
from __future__ import annotations
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING, Any, TypedDict
from typing import TYPE_CHECKING, TypedDict
import uuid
from a2a.client.errors import A2AClientHTTPError
from a2a.types import (
AgentCard,
Message,
@@ -21,10 +20,7 @@ from a2a.types import (
from typing_extensions import NotRequired
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AConnectionErrorEvent,
A2AResponseReceivedEvent,
)
from crewai.events.types.a2a_events import A2AResponseReceivedEvent
if TYPE_CHECKING:
@@ -59,8 +55,7 @@ class TaskStateResult(TypedDict):
history: list[Message]
result: NotRequired[str]
error: NotRequired[str]
agent_card: NotRequired[dict[str, Any]]
a2a_agent_name: NotRequired[str | None]
agent_card: NotRequired[AgentCard]
def extract_task_result_parts(a2a_task: A2ATask) -> list[str]:
@@ -136,69 +131,50 @@ def process_task_state(
is_multiturn: bool,
agent_role: str | None,
result_parts: list[str] | None = None,
endpoint: str | None = None,
a2a_agent_name: str | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
is_final: bool = True,
) -> TaskStateResult | None:
"""Process A2A task state and return result dictionary.
Shared logic for both polling and streaming handlers.
Args:
a2a_task: The A2A task to process.
new_messages: List to collect messages (modified in place).
agent_card: The agent card.
turn_number: Current turn number.
is_multiturn: Whether multi-turn conversation.
agent_role: Agent role for logging.
a2a_task: The A2A task to process
new_messages: List to collect messages (modified in place)
agent_card: The agent card
turn_number: Current turn number
is_multiturn: Whether multi-turn conversation
agent_role: Agent role for logging
result_parts: Accumulated result parts (streaming passes accumulated,
polling passes None to extract from task).
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
from_task: Optional CrewAI Task for event metadata.
from_agent: Optional CrewAI Agent for event metadata.
is_final: Whether this is the final response in the stream.
polling passes None to extract from task)
Returns:
Result dictionary if terminal/actionable state, None otherwise.
Result dictionary if terminal/actionable state, None otherwise
"""
should_extract = result_parts is None
if result_parts is None:
result_parts = []
if a2a_task.status.state == TaskState.completed:
if not result_parts:
if should_extract:
extracted_parts = extract_task_result_parts(a2a_task)
result_parts.extend(extracted_parts)
if a2a_task.history:
new_messages.extend(a2a_task.history)
response_text = " ".join(result_parts) if result_parts else ""
message_id = None
if a2a_task.status and a2a_task.status.message:
message_id = a2a_task.status.message.message_id
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=response_text,
turn_number=turn_number,
context_id=a2a_task.context_id,
message_id=message_id,
is_multiturn=is_multiturn,
status="completed",
final=is_final,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(
status=TaskState.completed,
agent_card=agent_card.model_dump(exclude_none=True),
agent_card=agent_card,
result=response_text,
history=new_messages,
)
@@ -218,24 +194,14 @@ def process_task_state(
)
new_messages.append(agent_message)
input_message_id = None
if a2a_task.status and a2a_task.status.message:
input_message_id = a2a_task.status.message.message_id
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=response_text,
turn_number=turn_number,
context_id=a2a_task.context_id,
message_id=input_message_id,
is_multiturn=is_multiturn,
status="input_required",
final=is_final,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
@@ -243,7 +209,7 @@ def process_task_state(
status=TaskState.input_required,
error=response_text,
history=new_messages,
agent_card=agent_card.model_dump(exclude_none=True),
agent_card=agent_card,
)
if a2a_task.status.state in {TaskState.failed, TaskState.rejected}:
@@ -282,11 +248,6 @@ async def send_message_and_get_task_id(
turn_number: int,
is_multiturn: bool,
agent_role: str | None,
from_task: Any | None = None,
from_agent: Any | None = None,
endpoint: str | None = None,
a2a_agent_name: str | None = None,
context_id: str | None = None,
) -> str | TaskStateResult:
"""Send message and process initial response.
@@ -301,11 +262,6 @@ async def send_message_and_get_task_id(
turn_number: Current turn number
is_multiturn: Whether multi-turn conversation
agent_role: Agent role for logging
from_task: Optional CrewAI Task object for event metadata.
from_agent: Optional CrewAI Agent object for event metadata.
endpoint: Optional A2A endpoint URL.
a2a_agent_name: Optional A2A agent name.
context_id: Optional A2A context ID for correlation.
Returns:
Task ID string if agent needs polling/waiting, or TaskStateResult if done.
@@ -324,16 +280,9 @@ async def send_message_and_get_task_id(
A2AResponseReceivedEvent(
response=response_text,
turn_number=turn_number,
context_id=event.context_id,
message_id=event.message_id,
is_multiturn=is_multiturn,
status="completed",
final=True,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
@@ -341,7 +290,7 @@ async def send_message_and_get_task_id(
status=TaskState.completed,
result=response_text,
history=new_messages,
agent_card=agent_card.model_dump(exclude_none=True),
agent_card=agent_card,
)
if isinstance(event, tuple):
@@ -355,10 +304,6 @@ async def send_message_and_get_task_id(
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
)
if result:
return result
@@ -371,99 +316,6 @@ async def send_message_and_get_task_id(
history=new_messages,
)
except A2AClientHTTPError as e:
error_msg = f"HTTP Error {e.status_code}: {e!s}"
error_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=error_msg))],
context_id=context_id,
)
new_messages.append(error_message)
crewai_event_bus.emit(
None,
A2AConnectionErrorEvent(
endpoint=endpoint or "",
error=str(e),
error_type="http_error",
status_code=e.status_code,
a2a_agent_name=a2a_agent_name,
operation="send_message",
context_id=context_id,
from_task=from_task,
from_agent=from_agent,
),
)
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
context_id=context_id,
is_multiturn=is_multiturn,
status="failed",
final=True,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
history=new_messages,
)
except Exception as e:
error_msg = f"Unexpected error during send_message: {e!s}"
error_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=error_msg))],
context_id=context_id,
)
new_messages.append(error_message)
crewai_event_bus.emit(
None,
A2AConnectionErrorEvent(
endpoint=endpoint or "",
error=str(e),
error_type="unexpected_error",
a2a_agent_name=a2a_agent_name,
operation="send_message",
context_id=context_id,
from_task=from_task,
from_agent=from_agent,
),
)
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
context_id=context_id,
is_multiturn=is_multiturn,
status="failed",
final=True,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
history=new_messages,
)
finally:
aclose = getattr(event_stream, "aclose", None)
if aclose:

View File

@@ -22,13 +22,6 @@ class BaseHandlerKwargs(TypedDict, total=False):
turn_number: int
is_multiturn: bool
agent_role: str | None
context_id: str | None
task_id: str | None
endpoint: str | None
agent_branch: Any
a2a_agent_name: str | None
from_task: Any
from_agent: Any
class PollingHandlerKwargs(BaseHandlerKwargs, total=False):
@@ -36,6 +29,8 @@ class PollingHandlerKwargs(BaseHandlerKwargs, total=False):
polling_interval: float
polling_timeout: float
endpoint: str
agent_branch: Any
history_length: int
max_polls: int | None
@@ -43,6 +38,9 @@ class PollingHandlerKwargs(BaseHandlerKwargs, total=False):
class StreamingHandlerKwargs(BaseHandlerKwargs, total=False):
"""Kwargs for streaming handler."""
context_id: str | None
task_id: str | None
class PushNotificationHandlerKwargs(BaseHandlerKwargs, total=False):
"""Kwargs for push notification handler."""
@@ -51,6 +49,7 @@ class PushNotificationHandlerKwargs(BaseHandlerKwargs, total=False):
result_store: PushNotificationResultStore
polling_timeout: float
polling_interval: float
agent_branch: Any
class PushNotificationResultStore(Protocol):

View File

@@ -31,7 +31,6 @@ from crewai.a2a.task_helpers import (
from crewai.a2a.updates.base import PollingHandlerKwargs
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AConnectionErrorEvent,
A2APollingStartedEvent,
A2APollingStatusEvent,
A2AResponseReceivedEvent,
@@ -50,33 +49,23 @@ async def _poll_task_until_complete(
agent_branch: Any | None = None,
history_length: int = 100,
max_polls: int | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
context_id: str | None = None,
endpoint: str | None = None,
a2a_agent_name: str | None = None,
) -> A2ATask:
"""Poll task status until terminal state reached.
Args:
client: A2A client instance.
task_id: Task ID to poll.
polling_interval: Seconds between poll attempts.
polling_timeout: Max seconds before timeout.
agent_branch: Agent tree branch for logging.
history_length: Number of messages to retrieve per poll.
max_polls: Max number of poll attempts (None = unlimited).
from_task: Optional CrewAI Task object for event metadata.
from_agent: Optional CrewAI Agent object for event metadata.
context_id: A2A context ID for correlation.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
client: A2A client instance
task_id: Task ID to poll
polling_interval: Seconds between poll attempts
polling_timeout: Max seconds before timeout
agent_branch: Agent tree branch for logging
history_length: Number of messages to retrieve per poll
max_polls: Max number of poll attempts (None = unlimited)
Returns:
Final task object in terminal state.
Final task object in terminal state
Raises:
A2APollingTimeoutError: If polling exceeds timeout or max_polls.
A2APollingTimeoutError: If polling exceeds timeout or max_polls
"""
start_time = time.monotonic()
poll_count = 0
@@ -88,19 +77,13 @@ async def _poll_task_until_complete(
)
elapsed = time.monotonic() - start_time
effective_context_id = task.context_id or context_id
crewai_event_bus.emit(
agent_branch,
A2APollingStatusEvent(
task_id=task_id,
context_id=effective_context_id,
state=str(task.status.state.value) if task.status.state else "unknown",
elapsed_seconds=elapsed,
poll_count=poll_count,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
@@ -154,9 +137,6 @@ class PollingHandler:
max_polls = kwargs.get("max_polls")
context_id = kwargs.get("context_id")
task_id = kwargs.get("task_id")
a2a_agent_name = kwargs.get("a2a_agent_name")
from_task = kwargs.get("from_task")
from_agent = kwargs.get("from_agent")
try:
result_or_task_id = await send_message_and_get_task_id(
@@ -166,11 +146,6 @@ class PollingHandler:
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
from_task=from_task,
from_agent=from_agent,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
context_id=context_id,
)
if not isinstance(result_or_task_id, str):
@@ -182,12 +157,8 @@ class PollingHandler:
agent_branch,
A2APollingStartedEvent(
task_id=task_id,
context_id=context_id,
polling_interval=polling_interval,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
@@ -199,11 +170,6 @@ class PollingHandler:
agent_branch=agent_branch,
history_length=history_length,
max_polls=max_polls,
from_task=from_task,
from_agent=from_agent,
context_id=context_id,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
)
result = process_task_state(
@@ -213,10 +179,6 @@ class PollingHandler:
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
)
if result:
return result
@@ -244,15 +206,9 @@ class PollingHandler:
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
context_id=context_id,
is_multiturn=is_multiturn,
status="failed",
final=True,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(
@@ -273,83 +229,14 @@ class PollingHandler:
)
new_messages.append(error_message)
crewai_event_bus.emit(
agent_branch,
A2AConnectionErrorEvent(
endpoint=endpoint,
error=str(e),
error_type="http_error",
status_code=e.status_code,
a2a_agent_name=a2a_agent_name,
operation="polling",
context_id=context_id,
task_id=task_id,
from_task=from_task,
from_agent=from_agent,
),
)
crewai_event_bus.emit(
agent_branch,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
context_id=context_id,
is_multiturn=is_multiturn,
status="failed",
final=True,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
history=new_messages,
)
except Exception as e:
error_msg = f"Unexpected error during polling: {e!s}"
error_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=error_msg))],
context_id=context_id,
task_id=task_id,
)
new_messages.append(error_message)
crewai_event_bus.emit(
agent_branch,
A2AConnectionErrorEvent(
endpoint=endpoint or "",
error=str(e),
error_type="unexpected_error",
a2a_agent_name=a2a_agent_name,
operation="polling",
context_id=context_id,
task_id=task_id,
from_task=from_task,
from_agent=from_agent,
),
)
crewai_event_bus.emit(
agent_branch,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
context_id=context_id,
is_multiturn=is_multiturn,
status="failed",
final=True,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(

View File

@@ -29,7 +29,6 @@ from crewai.a2a.updates.base import (
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AConnectionErrorEvent,
A2APushNotificationRegisteredEvent,
A2APushNotificationTimeoutEvent,
A2AResponseReceivedEvent,
@@ -49,11 +48,6 @@ async def _wait_for_push_result(
timeout: float,
poll_interval: float,
agent_branch: Any | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
context_id: str | None = None,
endpoint: str | None = None,
a2a_agent_name: str | None = None,
) -> A2ATask | None:
"""Wait for push notification result.
@@ -63,11 +57,6 @@ async def _wait_for_push_result(
timeout: Max seconds to wait.
poll_interval: Seconds between polling attempts.
agent_branch: Agent tree branch for logging.
from_task: Optional CrewAI Task object for event metadata.
from_agent: Optional CrewAI Agent object for event metadata.
context_id: A2A context ID for correlation.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent.
Returns:
Final task object, or None if timeout.
@@ -83,12 +72,7 @@ async def _wait_for_push_result(
agent_branch,
A2APushNotificationTimeoutEvent(
task_id=task_id,
context_id=context_id,
timeout_seconds=timeout,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
@@ -131,56 +115,18 @@ class PushNotificationHandler:
agent_role = kwargs.get("agent_role")
context_id = kwargs.get("context_id")
task_id = kwargs.get("task_id")
endpoint = kwargs.get("endpoint")
a2a_agent_name = kwargs.get("a2a_agent_name")
from_task = kwargs.get("from_task")
from_agent = kwargs.get("from_agent")
if config is None:
error_msg = (
"PushNotificationConfig is required for push notification handler"
)
crewai_event_bus.emit(
agent_branch,
A2AConnectionErrorEvent(
endpoint=endpoint or "",
error=error_msg,
error_type="configuration_error",
a2a_agent_name=a2a_agent_name,
operation="push_notification",
context_id=context_id,
task_id=task_id,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
error="PushNotificationConfig is required for push notification handler",
history=new_messages,
)
if result_store is None:
error_msg = (
"PushNotificationResultStore is required for push notification handler"
)
crewai_event_bus.emit(
agent_branch,
A2AConnectionErrorEvent(
endpoint=endpoint or "",
error=error_msg,
error_type="configuration_error",
a2a_agent_name=a2a_agent_name,
operation="push_notification",
context_id=context_id,
task_id=task_id,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
error="PushNotificationResultStore is required for push notification handler",
history=new_messages,
)
@@ -192,11 +138,6 @@ class PushNotificationHandler:
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
from_task=from_task,
from_agent=from_agent,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
context_id=context_id,
)
if not isinstance(result_or_task_id, str):
@@ -208,12 +149,7 @@ class PushNotificationHandler:
agent_branch,
A2APushNotificationRegisteredEvent(
task_id=task_id,
context_id=context_id,
callback_url=str(config.url),
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
@@ -229,11 +165,6 @@ class PushNotificationHandler:
timeout=polling_timeout,
poll_interval=polling_interval,
agent_branch=agent_branch,
from_task=from_task,
from_agent=from_agent,
context_id=context_id,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
)
if final_task is None:
@@ -250,10 +181,6 @@ class PushNotificationHandler:
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
)
if result:
return result
@@ -276,83 +203,14 @@ class PushNotificationHandler:
)
new_messages.append(error_message)
crewai_event_bus.emit(
agent_branch,
A2AConnectionErrorEvent(
endpoint=endpoint or "",
error=str(e),
error_type="http_error",
status_code=e.status_code,
a2a_agent_name=a2a_agent_name,
operation="push_notification",
context_id=context_id,
task_id=task_id,
from_task=from_task,
from_agent=from_agent,
),
)
crewai_event_bus.emit(
agent_branch,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
context_id=context_id,
is_multiturn=is_multiturn,
status="failed",
final=True,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
history=new_messages,
)
except Exception as e:
error_msg = f"Unexpected error during push notification: {e!s}"
error_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=error_msg))],
context_id=context_id,
task_id=task_id,
)
new_messages.append(error_message)
crewai_event_bus.emit(
agent_branch,
A2AConnectionErrorEvent(
endpoint=endpoint or "",
error=str(e),
error_type="unexpected_error",
a2a_agent_name=a2a_agent_name,
operation="push_notification",
context_id=context_id,
task_id=task_id,
from_task=from_task,
from_agent=from_agent,
),
)
crewai_event_bus.emit(
agent_branch,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
context_id=context_id,
is_multiturn=is_multiturn,
status="failed",
final=True,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(

View File

@@ -26,13 +26,7 @@ from crewai.a2a.task_helpers import (
)
from crewai.a2a.updates.base import StreamingHandlerKwargs
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AArtifactReceivedEvent,
A2AConnectionErrorEvent,
A2AResponseReceivedEvent,
A2AStreamingChunkEvent,
A2AStreamingStartedEvent,
)
from crewai.events.types.a2a_events import A2AResponseReceivedEvent
class StreamingHandler:
@@ -63,57 +57,19 @@ class StreamingHandler:
turn_number = kwargs.get("turn_number", 0)
is_multiturn = kwargs.get("is_multiturn", False)
agent_role = kwargs.get("agent_role")
endpoint = kwargs.get("endpoint")
a2a_agent_name = kwargs.get("a2a_agent_name")
from_task = kwargs.get("from_task")
from_agent = kwargs.get("from_agent")
agent_branch = kwargs.get("agent_branch")
result_parts: list[str] = []
final_result: TaskStateResult | None = None
event_stream = client.send_message(message)
chunk_index = 0
crewai_event_bus.emit(
agent_branch,
A2AStreamingStartedEvent(
task_id=task_id,
context_id=context_id,
endpoint=endpoint or "",
a2a_agent_name=a2a_agent_name,
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
from_task=from_task,
from_agent=from_agent,
),
)
try:
async for event in event_stream:
if isinstance(event, Message):
new_messages.append(event)
message_context_id = event.context_id or context_id
for part in event.parts:
if part.root.kind == "text":
text = part.root.text
result_parts.append(text)
crewai_event_bus.emit(
agent_branch,
A2AStreamingChunkEvent(
task_id=event.task_id or task_id,
context_id=message_context_id,
chunk=text,
chunk_index=chunk_index,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
turn_number=turn_number,
is_multiturn=is_multiturn,
from_task=from_task,
from_agent=from_agent,
),
)
chunk_index += 1
elif isinstance(event, tuple):
a2a_task, update = event
@@ -125,51 +81,10 @@ class StreamingHandler:
for part in artifact.parts
if part.root.kind == "text"
)
artifact_size = None
if artifact.parts:
artifact_size = sum(
len(p.root.text.encode("utf-8"))
if p.root.kind == "text"
else len(getattr(p.root, "data", b""))
for p in artifact.parts
)
effective_context_id = a2a_task.context_id or context_id
crewai_event_bus.emit(
agent_branch,
A2AArtifactReceivedEvent(
task_id=a2a_task.id,
artifact_id=artifact.artifact_id,
artifact_name=artifact.name,
artifact_description=artifact.description,
mime_type=artifact.parts[0].root.kind
if artifact.parts
else None,
size_bytes=artifact_size,
append=update.append or False,
last_chunk=update.last_chunk or False,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
context_id=effective_context_id,
turn_number=turn_number,
is_multiturn=is_multiturn,
from_task=from_task,
from_agent=from_agent,
),
)
is_final_update = False
if isinstance(update, TaskStatusUpdateEvent):
is_final_update = update.final
if (
update.status
and update.status.message
and update.status.message.parts
):
result_parts.extend(
part.root.text
for part in update.status.message.parts
if part.root.kind == "text" and part.root.text
)
if (
not is_final_update
@@ -186,11 +101,6 @@ class StreamingHandler:
is_multiturn=is_multiturn,
agent_role=agent_role,
result_parts=result_parts,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
is_final=is_final_update,
)
if final_result:
break
@@ -208,82 +118,13 @@ class StreamingHandler:
new_messages.append(error_message)
crewai_event_bus.emit(
agent_branch,
A2AConnectionErrorEvent(
endpoint=endpoint or "",
error=str(e),
error_type="http_error",
status_code=e.status_code,
a2a_agent_name=a2a_agent_name,
operation="streaming",
context_id=context_id,
task_id=task_id,
from_task=from_task,
from_agent=from_agent,
),
)
crewai_event_bus.emit(
agent_branch,
None,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
context_id=context_id,
is_multiturn=is_multiturn,
status="failed",
final=True,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(
status=TaskState.failed,
error=error_msg,
history=new_messages,
)
except Exception as e:
error_msg = f"Unexpected error during streaming: {e!s}"
error_message = Message(
role=Role.agent,
message_id=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=error_msg))],
context_id=context_id,
task_id=task_id,
)
new_messages.append(error_message)
crewai_event_bus.emit(
agent_branch,
A2AConnectionErrorEvent(
endpoint=endpoint or "",
error=str(e),
error_type="unexpected_error",
a2a_agent_name=a2a_agent_name,
operation="streaming",
context_id=context_id,
task_id=task_id,
from_task=from_task,
from_agent=from_agent,
),
)
crewai_event_bus.emit(
agent_branch,
A2AResponseReceivedEvent(
response=error_msg,
turn_number=turn_number,
context_id=context_id,
is_multiturn=is_multiturn,
status="failed",
final=True,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
from_task=from_task,
from_agent=from_agent,
),
)
return TaskStateResult(
@@ -295,23 +136,7 @@ class StreamingHandler:
finally:
aclose = getattr(event_stream, "aclose", None)
if aclose:
try:
await aclose()
except Exception as close_error:
crewai_event_bus.emit(
agent_branch,
A2AConnectionErrorEvent(
endpoint=endpoint or "",
error=str(close_error),
error_type="stream_close_error",
a2a_agent_name=a2a_agent_name,
operation="stream_close",
context_id=context_id,
task_id=task_id,
from_task=from_task,
from_agent=from_agent,
),
)
await aclose()
if final_result:
return final_result
@@ -320,5 +145,5 @@ class StreamingHandler:
status=TaskState.completed,
result=" ".join(result_parts) if result_parts else "",
history=new_messages,
agent_card=agent_card.model_dump(exclude_none=True),
agent_card=agent_card,
)

View File

@@ -23,12 +23,6 @@ from crewai.a2a.auth.utils import (
)
from crewai.a2a.config import A2AServerConfig
from crewai.crew import Crew
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AAgentCardFetchedEvent,
A2AAuthenticationFailedEvent,
A2AConnectionErrorEvent,
)
if TYPE_CHECKING:
@@ -189,8 +183,6 @@ async def _afetch_agent_card_impl(
timeout: int,
) -> AgentCard:
"""Internal async implementation of AgentCard fetching."""
start_time = time.perf_counter()
if "/.well-known/agent-card.json" in endpoint:
base_url = endpoint.replace("/.well-known/agent-card.json", "")
agent_card_path = "/.well-known/agent-card.json"
@@ -225,29 +217,9 @@ async def _afetch_agent_card_impl(
)
response.raise_for_status()
agent_card = AgentCard.model_validate(response.json())
fetch_time_ms = (time.perf_counter() - start_time) * 1000
agent_card_dict = agent_card.model_dump(exclude_none=True)
crewai_event_bus.emit(
None,
A2AAgentCardFetchedEvent(
endpoint=endpoint,
a2a_agent_name=agent_card.name,
agent_card=agent_card_dict,
protocol_version=agent_card.protocol_version,
provider=agent_card_dict.get("provider"),
cached=False,
fetch_time_ms=fetch_time_ms,
),
)
return agent_card
return AgentCard.model_validate(response.json())
except httpx.HTTPStatusError as e:
elapsed_ms = (time.perf_counter() - start_time) * 1000
response_body = e.response.text[:1000] if e.response.text else None
if e.response.status_code == 401:
error_details = ["Authentication failed"]
www_auth = e.response.headers.get("WWW-Authenticate")
@@ -256,93 +228,7 @@ async def _afetch_agent_card_impl(
if not auth:
error_details.append("No auth scheme provided")
msg = " | ".join(error_details)
auth_type = type(auth).__name__ if auth else None
crewai_event_bus.emit(
None,
A2AAuthenticationFailedEvent(
endpoint=endpoint,
auth_type=auth_type,
error=msg,
status_code=401,
metadata={
"elapsed_ms": elapsed_ms,
"response_body": response_body,
"www_authenticate": www_auth,
"request_url": str(e.request.url),
},
),
)
raise A2AClientHTTPError(401, msg) from e
crewai_event_bus.emit(
None,
A2AConnectionErrorEvent(
endpoint=endpoint,
error=str(e),
error_type="http_error",
status_code=e.response.status_code,
operation="fetch_agent_card",
metadata={
"elapsed_ms": elapsed_ms,
"response_body": response_body,
"request_url": str(e.request.url),
},
),
)
raise
except httpx.TimeoutException as e:
elapsed_ms = (time.perf_counter() - start_time) * 1000
crewai_event_bus.emit(
None,
A2AConnectionErrorEvent(
endpoint=endpoint,
error=str(e),
error_type="timeout",
operation="fetch_agent_card",
metadata={
"elapsed_ms": elapsed_ms,
"timeout_config": timeout,
"request_url": str(e.request.url) if e.request else None,
},
),
)
raise
except httpx.ConnectError as e:
elapsed_ms = (time.perf_counter() - start_time) * 1000
crewai_event_bus.emit(
None,
A2AConnectionErrorEvent(
endpoint=endpoint,
error=str(e),
error_type="connection_error",
operation="fetch_agent_card",
metadata={
"elapsed_ms": elapsed_ms,
"request_url": str(e.request.url) if e.request else None,
},
),
)
raise
except httpx.RequestError as e:
elapsed_ms = (time.perf_counter() - start_time) * 1000
crewai_event_bus.emit(
None,
A2AConnectionErrorEvent(
endpoint=endpoint,
error=str(e),
error_type="request_error",
operation="fetch_agent_card",
metadata={
"elapsed_ms": elapsed_ms,
"request_url": str(e.request.url) if e.request else None,
},
),
)
raise

View File

@@ -88,9 +88,6 @@ def execute_a2a_delegation(
response_model: type[BaseModel] | None = None,
turn_number: int | None = None,
updates: UpdateConfig | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
skill_id: str | None = None,
) -> TaskStateResult:
"""Execute a task delegation to a remote A2A agent synchronously.
@@ -132,9 +129,6 @@ def execute_a2a_delegation(
response_model: Optional Pydantic model for structured outputs.
turn_number: Optional turn number for multi-turn conversations.
updates: Update mechanism config from A2AConfig.updates.
from_task: Optional CrewAI Task object for event metadata.
from_agent: Optional CrewAI Agent object for event metadata.
skill_id: Optional skill ID to target a specific agent capability.
Returns:
TaskStateResult with status, result/error, history, and agent_card.
@@ -162,16 +156,10 @@ def execute_a2a_delegation(
transport_protocol=transport_protocol,
turn_number=turn_number,
updates=updates,
from_task=from_task,
from_agent=from_agent,
skill_id=skill_id,
)
)
finally:
try:
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
loop.close()
async def aexecute_a2a_delegation(
@@ -193,9 +181,6 @@ async def aexecute_a2a_delegation(
response_model: type[BaseModel] | None = None,
turn_number: int | None = None,
updates: UpdateConfig | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
skill_id: str | None = None,
) -> TaskStateResult:
"""Execute a task delegation to a remote A2A agent asynchronously.
@@ -237,9 +222,6 @@ async def aexecute_a2a_delegation(
response_model: Optional Pydantic model for structured outputs.
turn_number: Optional turn number for multi-turn conversations.
updates: Update mechanism config from A2AConfig.updates.
from_task: Optional CrewAI Task object for event metadata.
from_agent: Optional CrewAI Agent object for event metadata.
skill_id: Optional skill ID to target a specific agent capability.
Returns:
TaskStateResult with status, result/error, history, and agent_card.
@@ -251,6 +233,17 @@ async def aexecute_a2a_delegation(
if turn_number is None:
turn_number = len([m for m in conversation_history if m.role == Role.user]) + 1
crewai_event_bus.emit(
agent_branch,
A2ADelegationStartedEvent(
endpoint=endpoint,
task_description=task_description,
agent_id=agent_id,
is_multiturn=is_multiturn,
turn_number=turn_number,
),
)
result = await _aexecute_a2a_delegation_impl(
endpoint=endpoint,
auth=auth,
@@ -271,28 +264,15 @@ async def aexecute_a2a_delegation(
response_model=response_model,
updates=updates,
transport_protocol=transport_protocol,
from_task=from_task,
from_agent=from_agent,
skill_id=skill_id,
)
agent_card_data: dict[str, Any] = result.get("agent_card") or {}
crewai_event_bus.emit(
agent_branch,
A2ADelegationCompletedEvent(
status=result["status"],
result=result.get("result"),
error=result.get("error"),
context_id=context_id,
is_multiturn=is_multiturn,
endpoint=endpoint,
a2a_agent_name=result.get("a2a_agent_name"),
agent_card=agent_card_data,
provider=agent_card_data.get("provider"),
metadata=metadata,
extensions=list(extensions.keys()) if extensions else None,
from_task=from_task,
from_agent=from_agent,
),
)
@@ -319,9 +299,6 @@ async def _aexecute_a2a_delegation_impl(
agent_role: str | None,
response_model: type[BaseModel] | None,
updates: UpdateConfig | None,
from_task: Any | None = None,
from_agent: Any | None = None,
skill_id: str | None = None,
) -> TaskStateResult:
"""Internal async implementation of A2A delegation."""
if auth:
@@ -354,28 +331,6 @@ async def _aexecute_a2a_delegation_impl(
if agent_card.name:
a2a_agent_name = agent_card.name
agent_card_dict = agent_card.model_dump(exclude_none=True)
crewai_event_bus.emit(
agent_branch,
A2ADelegationStartedEvent(
endpoint=endpoint,
task_description=task_description,
agent_id=agent_id or endpoint,
context_id=context_id,
is_multiturn=is_multiturn,
turn_number=turn_number,
a2a_agent_name=a2a_agent_name,
agent_card=agent_card_dict,
protocol_version=agent_card.protocol_version,
provider=agent_card_dict.get("provider"),
skill_id=skill_id,
metadata=metadata,
extensions=list(extensions.keys()) if extensions else None,
from_task=from_task,
from_agent=from_agent,
),
)
if turn_number == 1:
agent_id_for_event = agent_id or endpoint
crewai_event_bus.emit(
@@ -383,17 +338,7 @@ async def _aexecute_a2a_delegation_impl(
A2AConversationStartedEvent(
agent_id=agent_id_for_event,
endpoint=endpoint,
context_id=context_id,
a2a_agent_name=a2a_agent_name,
agent_card=agent_card_dict,
protocol_version=agent_card.protocol_version,
provider=agent_card_dict.get("provider"),
skill_id=skill_id,
reference_task_ids=reference_task_ids,
metadata=metadata,
extensions=list(extensions.keys()) if extensions else None,
from_task=from_task,
from_agent=from_agent,
),
)
@@ -419,10 +364,6 @@ async def _aexecute_a2a_delegation_impl(
}
)
message_metadata = metadata.copy() if metadata else {}
if skill_id:
message_metadata["skill_id"] = skill_id
message = Message(
role=Role.user,
message_id=str(uuid.uuid4()),
@@ -430,7 +371,7 @@ async def _aexecute_a2a_delegation_impl(
context_id=context_id,
task_id=task_id,
reference_task_ids=reference_task_ids,
metadata=message_metadata if message_metadata else None,
metadata=metadata,
extensions=extensions,
)
@@ -440,17 +381,8 @@ async def _aexecute_a2a_delegation_impl(
A2AMessageSentEvent(
message=message_text,
turn_number=turn_number,
context_id=context_id,
message_id=message.message_id,
is_multiturn=is_multiturn,
agent_role=agent_role,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
skill_id=skill_id,
metadata=message_metadata if message_metadata else None,
extensions=list(extensions.keys()) if extensions else None,
from_task=from_task,
from_agent=from_agent,
),
)
@@ -465,9 +397,6 @@ async def _aexecute_a2a_delegation_impl(
"task_id": task_id,
"endpoint": endpoint,
"agent_branch": agent_branch,
"a2a_agent_name": a2a_agent_name,
"from_task": from_task,
"from_agent": from_agent,
}
if isinstance(updates, PollingConfig):
@@ -505,16 +434,13 @@ async def _aexecute_a2a_delegation_impl(
use_polling=use_polling,
push_notification_config=push_config_for_client,
) as client:
result = await handler.execute(
return await handler.execute(
client=client,
message=message,
new_messages=new_messages,
agent_card=agent_card,
**handler_kwargs,
)
result["a2a_agent_name"] = a2a_agent_name
result["agent_card"] = agent_card.model_dump(exclude_none=True)
return result
@asynccontextmanager

View File

@@ -3,14 +3,11 @@
from __future__ import annotations
import asyncio
import base64
from collections.abc import Callable, Coroutine
from datetime import datetime
from functools import wraps
import logging
import os
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
from urllib.parse import urlparse
from a2a.server.agent_execution import RequestContext
from a2a.server.events import EventQueue
@@ -48,14 +45,7 @@ T = TypeVar("T")
def _parse_redis_url(url: str) -> dict[str, Any]:
"""Parse a Redis URL into aiocache configuration.
Args:
url: Redis connection URL (e.g., redis://localhost:6379/0).
Returns:
Configuration dict for aiocache.RedisCache.
"""
from urllib.parse import urlparse
parsed = urlparse(url)
config: dict[str, Any] = {
@@ -137,7 +127,7 @@ def cancellable(
async for message in pubsub.listen():
if message["type"] == "message":
return True
except (OSError, ConnectionError) as e:
except Exception as e:
logger.warning("Cancel watcher error for task_id=%s: %s", task_id, e)
return await poll_for_cancel()
return False
@@ -193,12 +183,7 @@ async def execute(
msg = "task_id and context_id are required"
crewai_event_bus.emit(
agent,
A2AServerTaskFailedEvent(
task_id="",
context_id="",
error=msg,
from_agent=agent,
),
A2AServerTaskFailedEvent(a2a_task_id="", a2a_context_id="", error=msg),
)
raise ServerError(InvalidParamsError(message=msg)) from None
@@ -210,12 +195,7 @@ async def execute(
crewai_event_bus.emit(
agent,
A2AServerTaskStartedEvent(
task_id=task_id,
context_id=context_id,
from_task=task,
from_agent=agent,
),
A2AServerTaskStartedEvent(a2a_task_id=task_id, a2a_context_id=context_id),
)
try:
@@ -235,33 +215,20 @@ async def execute(
crewai_event_bus.emit(
agent,
A2AServerTaskCompletedEvent(
task_id=task_id,
context_id=context_id,
result=str(result),
from_task=task,
from_agent=agent,
a2a_task_id=task_id, a2a_context_id=context_id, result=str(result)
),
)
except asyncio.CancelledError:
crewai_event_bus.emit(
agent,
A2AServerTaskCanceledEvent(
task_id=task_id,
context_id=context_id,
from_task=task,
from_agent=agent,
),
A2AServerTaskCanceledEvent(a2a_task_id=task_id, a2a_context_id=context_id),
)
raise
except Exception as e:
crewai_event_bus.emit(
agent,
A2AServerTaskFailedEvent(
task_id=task_id,
context_id=context_id,
error=str(e),
from_task=task,
from_agent=agent,
a2a_task_id=task_id, a2a_context_id=context_id, error=str(e)
),
)
raise ServerError(
@@ -315,85 +282,3 @@ async def cancel(
context.current_task.status = TaskStatus(state=TaskState.canceled)
return context.current_task
return None
def list_tasks(
tasks: list[A2ATask],
context_id: str | None = None,
status: TaskState | None = None,
status_timestamp_after: datetime | None = None,
page_size: int = 50,
page_token: str | None = None,
history_length: int | None = None,
include_artifacts: bool = False,
) -> tuple[list[A2ATask], str | None, int]:
"""Filter and paginate A2A tasks.
Provides filtering by context, status, and timestamp, along with
cursor-based pagination. This is a pure utility function that operates
on an in-memory list of tasks - storage retrieval is handled separately.
Args:
tasks: All tasks to filter.
context_id: Filter by context ID to get tasks in a conversation.
status: Filter by task state (e.g., completed, working).
status_timestamp_after: Filter to tasks updated after this time.
page_size: Maximum tasks per page (default 50).
page_token: Base64-encoded cursor from previous response.
history_length: Limit history messages per task (None = full history).
include_artifacts: Whether to include task artifacts (default False).
Returns:
Tuple of (filtered_tasks, next_page_token, total_count).
- filtered_tasks: Tasks matching filters, paginated and trimmed.
- next_page_token: Token for next page, or None if no more pages.
- total_count: Total number of tasks matching filters (before pagination).
"""
filtered: list[A2ATask] = []
for task in tasks:
if context_id and task.context_id != context_id:
continue
if status and task.status.state != status:
continue
if status_timestamp_after and task.status.timestamp:
ts = datetime.fromisoformat(task.status.timestamp.replace("Z", "+00:00"))
if ts <= status_timestamp_after:
continue
filtered.append(task)
def get_timestamp(t: A2ATask) -> datetime:
"""Extract timestamp from task status for sorting."""
if t.status.timestamp is None:
return datetime.min
return datetime.fromisoformat(t.status.timestamp.replace("Z", "+00:00"))
filtered.sort(key=get_timestamp, reverse=True)
total = len(filtered)
start = 0
if page_token:
try:
cursor_id = base64.b64decode(page_token).decode()
for idx, task in enumerate(filtered):
if task.id == cursor_id:
start = idx + 1
break
except (ValueError, UnicodeDecodeError):
pass
page = filtered[start : start + page_size]
result: list[A2ATask] = []
for task in page:
task = task.model_copy(deep=True)
if history_length is not None and task.history:
task.history = task.history[-history_length:]
if not include_artifacts:
task.artifacts = None
result.append(task)
next_token: str | None = None
if result and len(result) == page_size:
next_token = base64.b64encode(result[-1].id.encode()).decode()
return result, next_token, total

View File

@@ -6,10 +6,9 @@ Wraps agent classes with A2A delegation capabilities.
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine, Mapping
from collections.abc import Callable, Coroutine
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
import json
from types import MethodType
from typing import TYPE_CHECKING, Any
@@ -190,7 +189,7 @@ def _execute_task_with_a2a(
a2a_agents: list[A2AConfig | A2AClientConfig],
original_fn: Callable[..., str],
task: Task,
agent_response_model: type[BaseModel] | None,
agent_response_model: type[BaseModel],
context: str | None,
tools: list[BaseTool] | None,
extension_registry: ExtensionRegistry,
@@ -278,7 +277,7 @@ def _execute_task_with_a2a(
def _augment_prompt_with_a2a(
a2a_agents: list[A2AConfig | A2AClientConfig],
task_description: str,
agent_cards: Mapping[str, AgentCard | dict[str, Any]],
agent_cards: dict[str, AgentCard],
conversation_history: list[Message] | None = None,
turn_num: int = 0,
max_turns: int | None = None,
@@ -310,15 +309,7 @@ def _augment_prompt_with_a2a(
for config in a2a_agents:
if config.endpoint in agent_cards:
card = agent_cards[config.endpoint]
if isinstance(card, dict):
filtered = {
k: v
for k, v in card.items()
if k in {"description", "url", "skills"} and v is not None
}
agents_text += f"\n{json.dumps(filtered, indent=2)}\n"
else:
agents_text += f"\n{card.model_dump_json(indent=2, exclude_none=True, include={'description', 'url', 'skills'})}\n"
agents_text += f"\n{card.model_dump_json(indent=2, exclude_none=True, include={'description', 'url', 'skills'})}\n"
failed_agents = failed_agents or {}
if failed_agents:
@@ -386,7 +377,7 @@ IMPORTANT: You have the ability to delegate this task to remote A2A agents.
def _parse_agent_response(
raw_result: str | dict[str, Any], agent_response_model: type[BaseModel] | None
raw_result: str | dict[str, Any], agent_response_model: type[BaseModel]
) -> BaseModel | str | dict[str, Any]:
"""Parse LLM output as AgentResponse or return raw agent response."""
if agent_response_model:
@@ -403,11 +394,6 @@ def _parse_agent_response(
def _handle_max_turns_exceeded(
conversation_history: list[Message],
max_turns: int,
from_task: Any | None = None,
from_agent: Any | None = None,
endpoint: str | None = None,
a2a_agent_name: str | None = None,
agent_card: dict[str, Any] | None = None,
) -> str:
"""Handle the case when max turns is exceeded.
@@ -435,11 +421,6 @@ def _handle_max_turns_exceeded(
final_result=final_message,
error=None,
total_turns=max_turns,
from_task=from_task,
from_agent=from_agent,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
agent_card=agent_card,
),
)
return final_message
@@ -451,11 +432,6 @@ def _handle_max_turns_exceeded(
final_result=None,
error=f"Conversation exceeded maximum turns ({max_turns})",
total_turns=max_turns,
from_task=from_task,
from_agent=from_agent,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
agent_card=agent_card,
),
)
raise Exception(f"A2A conversation exceeded maximum turns ({max_turns})")
@@ -466,12 +442,7 @@ def _process_response_result(
disable_structured_output: bool,
turn_num: int,
agent_role: str,
agent_response_model: type[BaseModel] | None,
from_task: Any | None = None,
from_agent: Any | None = None,
endpoint: str | None = None,
a2a_agent_name: str | None = None,
agent_card: dict[str, Any] | None = None,
agent_response_model: type[BaseModel],
) -> tuple[str | None, str | None]:
"""Process LLM response and determine next action.
@@ -490,10 +461,6 @@ def _process_response_result(
turn_number=final_turn_number,
is_multiturn=True,
agent_role=agent_role,
from_task=from_task,
from_agent=from_agent,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
),
)
crewai_event_bus.emit(
@@ -503,11 +470,6 @@ def _process_response_result(
final_result=result_text,
error=None,
total_turns=final_turn_number,
from_task=from_task,
from_agent=from_agent,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
agent_card=agent_card,
),
)
return result_text, None
@@ -528,10 +490,6 @@ def _process_response_result(
turn_number=final_turn_number,
is_multiturn=True,
agent_role=agent_role,
from_task=from_task,
from_agent=from_agent,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
),
)
crewai_event_bus.emit(
@@ -541,11 +499,6 @@ def _process_response_result(
final_result=str(llm_response.message),
error=None,
total_turns=final_turn_number,
from_task=from_task,
from_agent=from_agent,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
agent_card=agent_card,
),
)
return str(llm_response.message), None
@@ -557,15 +510,13 @@ def _process_response_result(
def _prepare_agent_cards_dict(
a2a_result: TaskStateResult,
agent_id: str,
agent_cards: Mapping[str, AgentCard | dict[str, Any]] | None,
) -> dict[str, AgentCard | dict[str, Any]]:
agent_cards: dict[str, AgentCard] | None,
) -> dict[str, AgentCard]:
"""Prepare agent cards dictionary from result and existing cards.
Shared logic for both sync and async response handlers.
"""
agent_cards_dict: dict[str, AgentCard | dict[str, Any]] = (
dict(agent_cards) if agent_cards else {}
)
agent_cards_dict = agent_cards or {}
if "agent_card" in a2a_result and agent_id not in agent_cards_dict:
agent_cards_dict[agent_id] = a2a_result["agent_card"]
return agent_cards_dict
@@ -578,7 +529,7 @@ def _prepare_delegation_context(
original_task_description: str | None,
) -> tuple[
list[A2AConfig | A2AClientConfig],
type[BaseModel] | None,
type[BaseModel],
str,
str,
A2AConfig | A2AClientConfig,
@@ -647,11 +598,6 @@ def _handle_task_completion(
reference_task_ids: list[str],
agent_config: A2AConfig | A2AClientConfig,
turn_num: int,
from_task: Any | None = None,
from_agent: Any | None = None,
endpoint: str | None = None,
a2a_agent_name: str | None = None,
agent_card: dict[str, Any] | None = None,
) -> tuple[str | None, str | None, list[str]]:
"""Handle task completion state including reference task updates.
@@ -678,11 +624,6 @@ def _handle_task_completion(
final_result=result_text,
error=None,
total_turns=final_turn_number,
from_task=from_task,
from_agent=from_agent,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
agent_card=agent_card,
),
)
return str(result_text), task_id_config, reference_task_ids
@@ -704,11 +645,8 @@ def _handle_agent_response_and_continue(
original_fn: Callable[..., str],
context: str | None,
tools: list[BaseTool] | None,
agent_response_model: type[BaseModel] | None,
agent_response_model: type[BaseModel],
remote_task_completed: bool = False,
endpoint: str | None = None,
a2a_agent_name: str | None = None,
agent_card: dict[str, Any] | None = None,
) -> tuple[str | None, str | None]:
"""Handle A2A result and get CrewAI agent's response.
@@ -760,11 +698,6 @@ def _handle_agent_response_and_continue(
turn_num=turn_num,
agent_role=self.role,
agent_response_model=agent_response_model,
from_task=task,
from_agent=self,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
agent_card=agent_card,
)
@@ -817,12 +750,6 @@ def _delegate_to_a2a(
conversation_history: list[Message] = []
current_agent_card = agent_cards.get(agent_id) if agent_cards else None
current_agent_card_dict = (
current_agent_card.model_dump() if current_agent_card else None
)
current_a2a_agent_name = current_agent_card.name if current_agent_card else None
try:
for turn_num in range(max_turns):
console_formatter = getattr(crewai_event_bus, "_console", None)
@@ -850,8 +777,6 @@ def _delegate_to_a2a(
turn_number=turn_num + 1,
updates=agent_config.updates,
transport_protocol=agent_config.transport_protocol,
from_task=task,
from_agent=self,
)
conversation_history = a2a_result.get("history", [])
@@ -872,11 +797,6 @@ def _delegate_to_a2a(
reference_task_ids,
agent_config,
turn_num,
from_task=task,
from_agent=self,
endpoint=agent_config.endpoint,
a2a_agent_name=current_a2a_agent_name,
agent_card=current_agent_card_dict,
)
)
if trusted_result is not None:
@@ -898,9 +818,6 @@ def _delegate_to_a2a(
tools=tools,
agent_response_model=agent_response_model,
remote_task_completed=(a2a_result["status"] == TaskState.completed),
endpoint=agent_config.endpoint,
a2a_agent_name=current_a2a_agent_name,
agent_card=current_agent_card_dict,
)
if final_result is not None:
@@ -929,9 +846,6 @@ def _delegate_to_a2a(
tools=tools,
agent_response_model=agent_response_model,
remote_task_completed=False,
endpoint=agent_config.endpoint,
a2a_agent_name=current_a2a_agent_name,
agent_card=current_agent_card_dict,
)
if final_result is not None:
@@ -948,24 +862,11 @@ def _delegate_to_a2a(
final_result=None,
error=error_msg,
total_turns=turn_num + 1,
from_task=task,
from_agent=self,
endpoint=agent_config.endpoint,
a2a_agent_name=current_a2a_agent_name,
agent_card=current_agent_card_dict,
),
)
return f"A2A delegation failed: {error_msg}"
return _handle_max_turns_exceeded(
conversation_history,
max_turns,
from_task=task,
from_agent=self,
endpoint=agent_config.endpoint,
a2a_agent_name=current_a2a_agent_name,
agent_card=current_agent_card_dict,
)
return _handle_max_turns_exceeded(conversation_history, max_turns)
finally:
task.description = original_task_description
@@ -1015,7 +916,7 @@ async def _aexecute_task_with_a2a(
a2a_agents: list[A2AConfig | A2AClientConfig],
original_fn: Callable[..., Coroutine[Any, Any, str]],
task: Task,
agent_response_model: type[BaseModel] | None,
agent_response_model: type[BaseModel],
context: str | None,
tools: list[BaseTool] | None,
extension_registry: ExtensionRegistry,
@@ -1100,11 +1001,8 @@ async def _ahandle_agent_response_and_continue(
original_fn: Callable[..., Coroutine[Any, Any, str]],
context: str | None,
tools: list[BaseTool] | None,
agent_response_model: type[BaseModel] | None,
agent_response_model: type[BaseModel],
remote_task_completed: bool = False,
endpoint: str | None = None,
a2a_agent_name: str | None = None,
agent_card: dict[str, Any] | None = None,
) -> tuple[str | None, str | None]:
"""Async version of _handle_agent_response_and_continue."""
agent_cards_dict = _prepare_agent_cards_dict(a2a_result, agent_id, agent_cards)
@@ -1134,11 +1032,6 @@ async def _ahandle_agent_response_and_continue(
turn_num=turn_num,
agent_role=self.role,
agent_response_model=agent_response_model,
from_task=task,
from_agent=self,
endpoint=endpoint,
a2a_agent_name=a2a_agent_name,
agent_card=agent_card,
)
@@ -1173,12 +1066,6 @@ async def _adelegate_to_a2a(
conversation_history: list[Message] = []
current_agent_card = agent_cards.get(agent_id) if agent_cards else None
current_agent_card_dict = (
current_agent_card.model_dump() if current_agent_card else None
)
current_a2a_agent_name = current_agent_card.name if current_agent_card else None
try:
for turn_num in range(max_turns):
console_formatter = getattr(crewai_event_bus, "_console", None)
@@ -1206,8 +1093,6 @@ async def _adelegate_to_a2a(
turn_number=turn_num + 1,
transport_protocol=agent_config.transport_protocol,
updates=agent_config.updates,
from_task=task,
from_agent=self,
)
conversation_history = a2a_result.get("history", [])
@@ -1228,11 +1113,6 @@ async def _adelegate_to_a2a(
reference_task_ids,
agent_config,
turn_num,
from_task=task,
from_agent=self,
endpoint=agent_config.endpoint,
a2a_agent_name=current_a2a_agent_name,
agent_card=current_agent_card_dict,
)
)
if trusted_result is not None:
@@ -1254,9 +1134,6 @@ async def _adelegate_to_a2a(
tools=tools,
agent_response_model=agent_response_model,
remote_task_completed=(a2a_result["status"] == TaskState.completed),
endpoint=agent_config.endpoint,
a2a_agent_name=current_a2a_agent_name,
agent_card=current_agent_card_dict,
)
if final_result is not None:
@@ -1284,9 +1161,6 @@ async def _adelegate_to_a2a(
context=context,
tools=tools,
agent_response_model=agent_response_model,
endpoint=agent_config.endpoint,
a2a_agent_name=current_a2a_agent_name,
agent_card=current_agent_card_dict,
)
if final_result is not None:
@@ -1303,24 +1177,11 @@ async def _adelegate_to_a2a(
final_result=None,
error=error_msg,
total_turns=turn_num + 1,
from_task=task,
from_agent=self,
endpoint=agent_config.endpoint,
a2a_agent_name=current_a2a_agent_name,
agent_card=current_agent_card_dict,
),
)
return f"A2A delegation failed: {error_msg}"
return _handle_max_turns_exceeded(
conversation_history,
max_turns,
from_task=task,
from_agent=self,
endpoint=agent_config.endpoint,
a2a_agent_name=current_a2a_agent_name,
agent_card=current_agent_card_dict,
)
return _handle_max_turns_exceeded(conversation_history, max_turns)
finally:
task.description = original_task_description

View File

@@ -1,28 +1,19 @@
from crewai.events.types.a2a_events import (
A2AAgentCardFetchedEvent,
A2AArtifactReceivedEvent,
A2AAuthenticationFailedEvent,
A2AConnectionErrorEvent,
A2AConversationCompletedEvent,
A2AConversationStartedEvent,
A2ADelegationCompletedEvent,
A2ADelegationStartedEvent,
A2AMessageSentEvent,
A2AParallelDelegationCompletedEvent,
A2AParallelDelegationStartedEvent,
A2APollingStartedEvent,
A2APollingStatusEvent,
A2APushNotificationReceivedEvent,
A2APushNotificationRegisteredEvent,
A2APushNotificationSentEvent,
A2APushNotificationTimeoutEvent,
A2AResponseReceivedEvent,
A2AServerTaskCanceledEvent,
A2AServerTaskCompletedEvent,
A2AServerTaskFailedEvent,
A2AServerTaskStartedEvent,
A2AStreamingChunkEvent,
A2AStreamingStartedEvent,
)
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
@@ -102,11 +93,7 @@ from crewai.events.types.tool_usage_events import (
EventTypes = (
A2AAgentCardFetchedEvent
| A2AArtifactReceivedEvent
| A2AAuthenticationFailedEvent
| A2AConnectionErrorEvent
| A2AConversationCompletedEvent
A2AConversationCompletedEvent
| A2AConversationStartedEvent
| A2ADelegationCompletedEvent
| A2ADelegationStartedEvent
@@ -115,17 +102,12 @@ EventTypes = (
| A2APollingStatusEvent
| A2APushNotificationReceivedEvent
| A2APushNotificationRegisteredEvent
| A2APushNotificationSentEvent
| A2APushNotificationTimeoutEvent
| A2AResponseReceivedEvent
| A2AServerTaskCanceledEvent
| A2AServerTaskCompletedEvent
| A2AServerTaskFailedEvent
| A2AServerTaskStartedEvent
| A2AStreamingChunkEvent
| A2AStreamingStartedEvent
| A2AParallelDelegationStartedEvent
| A2AParallelDelegationCompletedEvent
| CrewKickoffStartedEvent
| CrewKickoffCompletedEvent
| CrewKickoffFailedEvent

View File

@@ -1,7 +1,7 @@
"""Trace collection listener for orchestrating trace collection."""
import os
from typing import Any, ClassVar
from typing import Any, ClassVar, cast
import uuid
from typing_extensions import Self
@@ -18,32 +18,6 @@ from crewai.events.listeners.tracing.types import TraceEvent
from crewai.events.listeners.tracing.utils import (
safe_serialize_to_dict,
)
from crewai.events.types.a2a_events import (
A2AAgentCardFetchedEvent,
A2AArtifactReceivedEvent,
A2AAuthenticationFailedEvent,
A2AConnectionErrorEvent,
A2AConversationCompletedEvent,
A2AConversationStartedEvent,
A2ADelegationCompletedEvent,
A2ADelegationStartedEvent,
A2AMessageSentEvent,
A2AParallelDelegationCompletedEvent,
A2AParallelDelegationStartedEvent,
A2APollingStartedEvent,
A2APollingStatusEvent,
A2APushNotificationReceivedEvent,
A2APushNotificationRegisteredEvent,
A2APushNotificationSentEvent,
A2APushNotificationTimeoutEvent,
A2AResponseReceivedEvent,
A2AServerTaskCanceledEvent,
A2AServerTaskCompletedEvent,
A2AServerTaskFailedEvent,
A2AServerTaskStartedEvent,
A2AStreamingChunkEvent,
A2AStreamingStartedEvent,
)
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
@@ -131,7 +105,7 @@ class TraceCollectionListener(BaseEventListener):
"""Create or return singleton instance."""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
return cast(Self, cls._instance)
def __init__(
self,
@@ -186,7 +160,6 @@ class TraceCollectionListener(BaseEventListener):
self._register_flow_event_handlers(crewai_event_bus)
self._register_context_event_handlers(crewai_event_bus)
self._register_action_event_handlers(crewai_event_bus)
self._register_a2a_event_handlers(crewai_event_bus)
self._register_system_event_handlers(crewai_event_bus)
self._listeners_setup = True
@@ -466,147 +439,6 @@ class TraceCollectionListener(BaseEventListener):
) -> None:
self._handle_action_event("knowledge_query_failed", source, event)
def _register_a2a_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for A2A (Agent-to-Agent) events."""
@event_bus.on(A2ADelegationStartedEvent)
def on_a2a_delegation_started(
source: Any, event: A2ADelegationStartedEvent
) -> None:
self._handle_action_event("a2a_delegation_started", source, event)
@event_bus.on(A2ADelegationCompletedEvent)
def on_a2a_delegation_completed(
source: Any, event: A2ADelegationCompletedEvent
) -> None:
self._handle_action_event("a2a_delegation_completed", source, event)
@event_bus.on(A2AConversationStartedEvent)
def on_a2a_conversation_started(
source: Any, event: A2AConversationStartedEvent
) -> None:
self._handle_action_event("a2a_conversation_started", source, event)
@event_bus.on(A2AMessageSentEvent)
def on_a2a_message_sent(source: Any, event: A2AMessageSentEvent) -> None:
self._handle_action_event("a2a_message_sent", source, event)
@event_bus.on(A2AResponseReceivedEvent)
def on_a2a_response_received(
source: Any, event: A2AResponseReceivedEvent
) -> None:
self._handle_action_event("a2a_response_received", source, event)
@event_bus.on(A2AConversationCompletedEvent)
def on_a2a_conversation_completed(
source: Any, event: A2AConversationCompletedEvent
) -> None:
self._handle_action_event("a2a_conversation_completed", source, event)
@event_bus.on(A2APollingStartedEvent)
def on_a2a_polling_started(source: Any, event: A2APollingStartedEvent) -> None:
self._handle_action_event("a2a_polling_started", source, event)
@event_bus.on(A2APollingStatusEvent)
def on_a2a_polling_status(source: Any, event: A2APollingStatusEvent) -> None:
self._handle_action_event("a2a_polling_status", source, event)
@event_bus.on(A2APushNotificationRegisteredEvent)
def on_a2a_push_notification_registered(
source: Any, event: A2APushNotificationRegisteredEvent
) -> None:
self._handle_action_event("a2a_push_notification_registered", source, event)
@event_bus.on(A2APushNotificationReceivedEvent)
def on_a2a_push_notification_received(
source: Any, event: A2APushNotificationReceivedEvent
) -> None:
self._handle_action_event("a2a_push_notification_received", source, event)
@event_bus.on(A2APushNotificationSentEvent)
def on_a2a_push_notification_sent(
source: Any, event: A2APushNotificationSentEvent
) -> None:
self._handle_action_event("a2a_push_notification_sent", source, event)
@event_bus.on(A2APushNotificationTimeoutEvent)
def on_a2a_push_notification_timeout(
source: Any, event: A2APushNotificationTimeoutEvent
) -> None:
self._handle_action_event("a2a_push_notification_timeout", source, event)
@event_bus.on(A2AStreamingStartedEvent)
def on_a2a_streaming_started(
source: Any, event: A2AStreamingStartedEvent
) -> None:
self._handle_action_event("a2a_streaming_started", source, event)
@event_bus.on(A2AStreamingChunkEvent)
def on_a2a_streaming_chunk(source: Any, event: A2AStreamingChunkEvent) -> None:
self._handle_action_event("a2a_streaming_chunk", source, event)
@event_bus.on(A2AAgentCardFetchedEvent)
def on_a2a_agent_card_fetched(
source: Any, event: A2AAgentCardFetchedEvent
) -> None:
self._handle_action_event("a2a_agent_card_fetched", source, event)
@event_bus.on(A2AAuthenticationFailedEvent)
def on_a2a_authentication_failed(
source: Any, event: A2AAuthenticationFailedEvent
) -> None:
self._handle_action_event("a2a_authentication_failed", source, event)
@event_bus.on(A2AArtifactReceivedEvent)
def on_a2a_artifact_received(
source: Any, event: A2AArtifactReceivedEvent
) -> None:
self._handle_action_event("a2a_artifact_received", source, event)
@event_bus.on(A2AConnectionErrorEvent)
def on_a2a_connection_error(
source: Any, event: A2AConnectionErrorEvent
) -> None:
self._handle_action_event("a2a_connection_error", source, event)
@event_bus.on(A2AServerTaskStartedEvent)
def on_a2a_server_task_started(
source: Any, event: A2AServerTaskStartedEvent
) -> None:
self._handle_action_event("a2a_server_task_started", source, event)
@event_bus.on(A2AServerTaskCompletedEvent)
def on_a2a_server_task_completed(
source: Any, event: A2AServerTaskCompletedEvent
) -> None:
self._handle_action_event("a2a_server_task_completed", source, event)
@event_bus.on(A2AServerTaskCanceledEvent)
def on_a2a_server_task_canceled(
source: Any, event: A2AServerTaskCanceledEvent
) -> None:
self._handle_action_event("a2a_server_task_canceled", source, event)
@event_bus.on(A2AServerTaskFailedEvent)
def on_a2a_server_task_failed(
source: Any, event: A2AServerTaskFailedEvent
) -> None:
self._handle_action_event("a2a_server_task_failed", source, event)
@event_bus.on(A2AParallelDelegationStartedEvent)
def on_a2a_parallel_delegation_started(
source: Any, event: A2AParallelDelegationStartedEvent
) -> None:
self._handle_action_event("a2a_parallel_delegation_started", source, event)
@event_bus.on(A2AParallelDelegationCompletedEvent)
def on_a2a_parallel_delegation_completed(
source: Any, event: A2AParallelDelegationCompletedEvent
) -> None:
self._handle_action_event(
"a2a_parallel_delegation_completed", source, event
)
def _register_system_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for system signal events (SIGTERM, SIGINT, etc.)."""
@@ -738,15 +570,10 @@ class TraceCollectionListener(BaseEventListener):
if event_type not in self.complex_events:
return safe_serialize_to_dict(event)
if event_type == "task_started":
task_name = event.task.name or event.task.description
task_display_name = (
task_name[:80] + "..." if len(task_name) > 80 else task_name
)
return {
"task_description": event.task.description,
"expected_output": event.task.expected_output,
"task_name": task_name,
"task_display_name": task_display_name,
"task_name": event.task.name or event.task.description,
"context": event.context,
"agent_role": source.agent.role,
"task_id": str(event.task.id),

View File

@@ -4,120 +4,68 @@ This module defines events emitted during A2A protocol delegation,
including both single-turn and multiturn conversation flows.
"""
from __future__ import annotations
from typing import Any, Literal
from pydantic import model_validator
from crewai.events.base_events import BaseEvent
class A2AEventBase(BaseEvent):
"""Base class for A2A events with task/agent context."""
from_task: Any = None
from_agent: Any = None
from_task: Any | None = None
from_agent: Any | None = None
@model_validator(mode="before")
@classmethod
def extract_task_and_agent_metadata(cls, data: dict[str, Any]) -> dict[str, Any]:
"""Extract task and agent metadata before validation."""
if task := data.get("from_task"):
def __init__(self, **data: Any) -> None:
"""Initialize A2A event, extracting task and agent metadata."""
if data.get("from_task"):
task = data["from_task"]
data["task_id"] = str(task.id)
data["task_name"] = task.name or task.description
data.setdefault("source_fingerprint", str(task.id))
data.setdefault("source_type", "task")
data.setdefault(
"fingerprint_metadata",
{
"task_id": str(task.id),
"task_name": task.name or task.description,
},
)
data["from_task"] = None
if agent := data.get("from_agent"):
if data.get("from_agent"):
agent = data["from_agent"]
data["agent_id"] = str(agent.id)
data["agent_role"] = agent.role
data.setdefault("source_fingerprint", str(agent.id))
data.setdefault("source_type", "agent")
data.setdefault(
"fingerprint_metadata",
{
"agent_id": str(agent.id),
"agent_role": agent.role,
},
)
data["from_agent"] = None
return data
super().__init__(**data)
class A2ADelegationStartedEvent(A2AEventBase):
"""Event emitted when A2A delegation starts.
Attributes:
endpoint: A2A agent endpoint URL (AgentCard URL).
task_description: Task being delegated to the A2A agent.
agent_id: A2A agent identifier.
context_id: A2A context ID grouping related tasks.
is_multiturn: Whether this is part of a multiturn conversation.
turn_number: Current turn number (1-indexed, 1 for single-turn).
a2a_agent_name: Name of the A2A agent from agent card.
agent_card: Full A2A agent card metadata.
protocol_version: A2A protocol version being used.
provider: Agent provider/organization info from agent card.
skill_id: ID of the specific skill being invoked.
metadata: Custom A2A metadata key-value pairs.
extensions: List of A2A extension URIs in use.
endpoint: A2A agent endpoint URL (AgentCard URL)
task_description: Task being delegated to the A2A agent
agent_id: A2A agent identifier
is_multiturn: Whether this is part of a multiturn conversation
turn_number: Current turn number (1-indexed, 1 for single-turn)
"""
type: str = "a2a_delegation_started"
endpoint: str
task_description: str
agent_id: str
context_id: str | None = None
is_multiturn: bool = False
turn_number: int = 1
a2a_agent_name: str | None = None
agent_card: dict[str, Any] | None = None
protocol_version: str | None = None
provider: dict[str, Any] | None = None
skill_id: str | None = None
metadata: dict[str, Any] | None = None
extensions: list[str] | None = None
class A2ADelegationCompletedEvent(A2AEventBase):
"""Event emitted when A2A delegation completes.
Attributes:
status: Completion status (completed, input_required, failed, etc.).
result: Result message if status is completed.
error: Error/response message (error for failed, response for input_required).
context_id: A2A context ID grouping related tasks.
is_multiturn: Whether this is part of a multiturn conversation.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
agent_card: Full A2A agent card metadata.
provider: Agent provider/organization info from agent card.
metadata: Custom A2A metadata key-value pairs.
extensions: List of A2A extension URIs in use.
status: Completion status (completed, input_required, failed, etc.)
result: Result message if status is completed
error: Error/response message (error for failed, response for input_required)
is_multiturn: Whether this is part of a multiturn conversation
"""
type: str = "a2a_delegation_completed"
status: str
result: str | None = None
error: str | None = None
context_id: str | None = None
is_multiturn: bool = False
endpoint: str | None = None
a2a_agent_name: str | None = None
agent_card: dict[str, Any] | None = None
provider: dict[str, Any] | None = None
metadata: dict[str, Any] | None = None
extensions: list[str] | None = None
class A2AConversationStartedEvent(A2AEventBase):
@@ -127,95 +75,51 @@ class A2AConversationStartedEvent(A2AEventBase):
before the first message exchange.
Attributes:
agent_id: A2A agent identifier.
endpoint: A2A agent endpoint URL.
context_id: A2A context ID grouping related tasks.
a2a_agent_name: Name of the A2A agent from agent card.
agent_card: Full A2A agent card metadata.
protocol_version: A2A protocol version being used.
provider: Agent provider/organization info from agent card.
skill_id: ID of the specific skill being invoked.
reference_task_ids: Related task IDs for context.
metadata: Custom A2A metadata key-value pairs.
extensions: List of A2A extension URIs in use.
agent_id: A2A agent identifier
endpoint: A2A agent endpoint URL
a2a_agent_name: Name of the A2A agent from agent card
"""
type: str = "a2a_conversation_started"
agent_id: str
endpoint: str
context_id: str | None = None
a2a_agent_name: str | None = None
agent_card: dict[str, Any] | None = None
protocol_version: str | None = None
provider: dict[str, Any] | None = None
skill_id: str | None = None
reference_task_ids: list[str] | None = None
metadata: dict[str, Any] | None = None
extensions: list[str] | None = None
class A2AMessageSentEvent(A2AEventBase):
"""Event emitted when a message is sent to the A2A agent.
Attributes:
message: Message content sent to the A2A agent.
turn_number: Current turn number (1-indexed).
context_id: A2A context ID grouping related tasks.
message_id: Unique A2A message identifier.
is_multiturn: Whether this is part of a multiturn conversation.
agent_role: Role of the CrewAI agent sending the message.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
skill_id: ID of the specific skill being invoked.
metadata: Custom A2A metadata key-value pairs.
extensions: List of A2A extension URIs in use.
message: Message content sent to the A2A agent
turn_number: Current turn number (1-indexed)
is_multiturn: Whether this is part of a multiturn conversation
agent_role: Role of the CrewAI agent sending the message
"""
type: str = "a2a_message_sent"
message: str
turn_number: int
context_id: str | None = None
message_id: str | None = None
is_multiturn: bool = False
agent_role: str | None = None
endpoint: str | None = None
a2a_agent_name: str | None = None
skill_id: str | None = None
metadata: dict[str, Any] | None = None
extensions: list[str] | None = None
class A2AResponseReceivedEvent(A2AEventBase):
"""Event emitted when a response is received from the A2A agent.
Attributes:
response: Response content from the A2A agent.
turn_number: Current turn number (1-indexed).
context_id: A2A context ID grouping related tasks.
message_id: Unique A2A message identifier.
is_multiturn: Whether this is part of a multiturn conversation.
status: Response status (input_required, completed, etc.).
final: Whether this is the final response in the stream.
agent_role: Role of the CrewAI agent (for display).
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
metadata: Custom A2A metadata key-value pairs.
extensions: List of A2A extension URIs in use.
response: Response content from the A2A agent
turn_number: Current turn number (1-indexed)
is_multiturn: Whether this is part of a multiturn conversation
status: Response status (input_required, completed, etc.)
agent_role: Role of the CrewAI agent (for display)
"""
type: str = "a2a_response_received"
response: str
turn_number: int
context_id: str | None = None
message_id: str | None = None
is_multiturn: bool = False
status: str
final: bool = False
agent_role: str | None = None
endpoint: str | None = None
a2a_agent_name: str | None = None
metadata: dict[str, Any] | None = None
extensions: list[str] | None = None
class A2AConversationCompletedEvent(A2AEventBase):
@@ -224,433 +128,119 @@ class A2AConversationCompletedEvent(A2AEventBase):
This is emitted once at the end of a multiturn conversation.
Attributes:
status: Final status (completed, failed, etc.).
final_result: Final result if completed successfully.
error: Error message if failed.
context_id: A2A context ID grouping related tasks.
total_turns: Total number of turns in the conversation.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
agent_card: Full A2A agent card metadata.
reference_task_ids: Related task IDs for context.
metadata: Custom A2A metadata key-value pairs.
extensions: List of A2A extension URIs in use.
status: Final status (completed, failed, etc.)
final_result: Final result if completed successfully
error: Error message if failed
total_turns: Total number of turns in the conversation
"""
type: str = "a2a_conversation_completed"
status: Literal["completed", "failed"]
final_result: str | None = None
error: str | None = None
context_id: str | None = None
total_turns: int
endpoint: str | None = None
a2a_agent_name: str | None = None
agent_card: dict[str, Any] | None = None
reference_task_ids: list[str] | None = None
metadata: dict[str, Any] | None = None
extensions: list[str] | None = None
class A2APollingStartedEvent(A2AEventBase):
"""Event emitted when polling mode begins for A2A delegation.
Attributes:
task_id: A2A task ID being polled.
context_id: A2A context ID grouping related tasks.
polling_interval: Seconds between poll attempts.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
metadata: Custom A2A metadata key-value pairs.
task_id: A2A task ID being polled
polling_interval: Seconds between poll attempts
endpoint: A2A agent endpoint URL
"""
type: str = "a2a_polling_started"
task_id: str
context_id: str | None = None
polling_interval: float
endpoint: str
a2a_agent_name: str | None = None
metadata: dict[str, Any] | None = None
class A2APollingStatusEvent(A2AEventBase):
"""Event emitted on each polling iteration.
Attributes:
task_id: A2A task ID being polled.
context_id: A2A context ID grouping related tasks.
state: Current task state from remote agent.
elapsed_seconds: Time since polling started.
poll_count: Number of polls completed.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
metadata: Custom A2A metadata key-value pairs.
task_id: A2A task ID being polled
state: Current task state from remote agent
elapsed_seconds: Time since polling started
poll_count: Number of polls completed
"""
type: str = "a2a_polling_status"
task_id: str
context_id: str | None = None
state: str
elapsed_seconds: float
poll_count: int
endpoint: str | None = None
a2a_agent_name: str | None = None
metadata: dict[str, Any] | None = None
class A2APushNotificationRegisteredEvent(A2AEventBase):
"""Event emitted when push notification callback is registered.
Attributes:
task_id: A2A task ID for which callback is registered.
context_id: A2A context ID grouping related tasks.
callback_url: URL where agent will send push notifications.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
metadata: Custom A2A metadata key-value pairs.
task_id: A2A task ID for which callback is registered
callback_url: URL where agent will send push notifications
"""
type: str = "a2a_push_notification_registered"
task_id: str
context_id: str | None = None
callback_url: str
endpoint: str | None = None
a2a_agent_name: str | None = None
metadata: dict[str, Any] | None = None
class A2APushNotificationReceivedEvent(A2AEventBase):
"""Event emitted when a push notification is received.
This event should be emitted by the user's webhook handler when it receives
a push notification from the remote A2A agent, before calling
`result_store.store_result()`.
Attributes:
task_id: A2A task ID from the notification.
context_id: A2A context ID grouping related tasks.
state: Current task state from the notification.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
metadata: Custom A2A metadata key-value pairs.
task_id: A2A task ID from the notification
state: Current task state from the notification
"""
type: str = "a2a_push_notification_received"
task_id: str
context_id: str | None = None
state: str
endpoint: str | None = None
a2a_agent_name: str | None = None
metadata: dict[str, Any] | None = None
class A2APushNotificationSentEvent(A2AEventBase):
"""Event emitted when a push notification is sent to a callback URL.
Emitted by the A2A server when it sends a task status update to the
client's registered push notification callback URL.
Attributes:
task_id: A2A task ID being notified.
context_id: A2A context ID grouping related tasks.
callback_url: URL the notification was sent to.
state: Task state being reported.
success: Whether the notification was successfully delivered.
error: Error message if delivery failed.
metadata: Custom A2A metadata key-value pairs.
"""
type: str = "a2a_push_notification_sent"
task_id: str
context_id: str | None = None
callback_url: str
state: str
success: bool = True
error: str | None = None
metadata: dict[str, Any] | None = None
class A2APushNotificationTimeoutEvent(A2AEventBase):
"""Event emitted when push notification wait times out.
Attributes:
task_id: A2A task ID that timed out.
context_id: A2A context ID grouping related tasks.
timeout_seconds: Timeout duration in seconds.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
metadata: Custom A2A metadata key-value pairs.
task_id: A2A task ID that timed out
timeout_seconds: Timeout duration in seconds
"""
type: str = "a2a_push_notification_timeout"
task_id: str
context_id: str | None = None
timeout_seconds: float
endpoint: str | None = None
a2a_agent_name: str | None = None
metadata: dict[str, Any] | None = None
class A2AStreamingStartedEvent(A2AEventBase):
"""Event emitted when streaming mode begins for A2A delegation.
Attributes:
task_id: A2A task ID for the streaming session.
context_id: A2A context ID grouping related tasks.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
turn_number: Current turn number (1-indexed).
is_multiturn: Whether this is part of a multiturn conversation.
agent_role: Role of the CrewAI agent.
metadata: Custom A2A metadata key-value pairs.
extensions: List of A2A extension URIs in use.
"""
type: str = "a2a_streaming_started"
task_id: str | None = None
context_id: str | None = None
endpoint: str
a2a_agent_name: str | None = None
turn_number: int = 1
is_multiturn: bool = False
agent_role: str | None = None
metadata: dict[str, Any] | None = None
extensions: list[str] | None = None
class A2AStreamingChunkEvent(A2AEventBase):
"""Event emitted when a streaming chunk is received.
Attributes:
task_id: A2A task ID for the streaming session.
context_id: A2A context ID grouping related tasks.
chunk: The text content of the chunk.
chunk_index: Index of this chunk in the stream (0-indexed).
final: Whether this is the final chunk in the stream.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
turn_number: Current turn number (1-indexed).
is_multiturn: Whether this is part of a multiturn conversation.
metadata: Custom A2A metadata key-value pairs.
extensions: List of A2A extension URIs in use.
"""
type: str = "a2a_streaming_chunk"
task_id: str | None = None
context_id: str | None = None
chunk: str
chunk_index: int
final: bool = False
endpoint: str | None = None
a2a_agent_name: str | None = None
turn_number: int = 1
is_multiturn: bool = False
metadata: dict[str, Any] | None = None
extensions: list[str] | None = None
class A2AAgentCardFetchedEvent(A2AEventBase):
"""Event emitted when an agent card is successfully fetched.
Attributes:
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
agent_card: Full A2A agent card metadata.
protocol_version: A2A protocol version from agent card.
provider: Agent provider/organization info from agent card.
cached: Whether the agent card was retrieved from cache.
fetch_time_ms: Time taken to fetch the agent card in milliseconds.
metadata: Custom A2A metadata key-value pairs.
"""
type: str = "a2a_agent_card_fetched"
endpoint: str
a2a_agent_name: str | None = None
agent_card: dict[str, Any] | None = None
protocol_version: str | None = None
provider: dict[str, Any] | None = None
cached: bool = False
fetch_time_ms: float | None = None
metadata: dict[str, Any] | None = None
class A2AAuthenticationFailedEvent(A2AEventBase):
"""Event emitted when authentication to an A2A agent fails.
Attributes:
endpoint: A2A agent endpoint URL.
auth_type: Type of authentication attempted (e.g., bearer, oauth2, api_key).
error: Error message describing the failure.
status_code: HTTP status code if applicable.
a2a_agent_name: Name of the A2A agent if known.
protocol_version: A2A protocol version being used.
metadata: Custom A2A metadata key-value pairs.
"""
type: str = "a2a_authentication_failed"
endpoint: str
auth_type: str | None = None
error: str
status_code: int | None = None
a2a_agent_name: str | None = None
protocol_version: str | None = None
metadata: dict[str, Any] | None = None
class A2AArtifactReceivedEvent(A2AEventBase):
"""Event emitted when an artifact is received from a remote A2A agent.
Attributes:
task_id: A2A task ID the artifact belongs to.
artifact_id: Unique identifier for the artifact.
artifact_name: Name of the artifact.
artifact_description: Purpose description of the artifact.
mime_type: MIME type of the artifact content.
size_bytes: Size of the artifact in bytes.
append: Whether content should be appended to existing artifact.
last_chunk: Whether this is the final chunk of the artifact.
endpoint: A2A agent endpoint URL.
a2a_agent_name: Name of the A2A agent from agent card.
context_id: Context ID for correlation.
turn_number: Current turn number (1-indexed).
is_multiturn: Whether this is part of a multiturn conversation.
metadata: Custom A2A metadata key-value pairs.
extensions: List of A2A extension URIs in use.
"""
type: str = "a2a_artifact_received"
task_id: str
artifact_id: str
artifact_name: str | None = None
artifact_description: str | None = None
mime_type: str | None = None
size_bytes: int | None = None
append: bool = False
last_chunk: bool = False
endpoint: str | None = None
a2a_agent_name: str | None = None
context_id: str | None = None
turn_number: int = 1
is_multiturn: bool = False
metadata: dict[str, Any] | None = None
extensions: list[str] | None = None
class A2AConnectionErrorEvent(A2AEventBase):
"""Event emitted when a connection error occurs during A2A communication.
Attributes:
endpoint: A2A agent endpoint URL.
error: Error message describing the connection failure.
error_type: Type of error (e.g., timeout, connection_refused, dns_error).
status_code: HTTP status code if applicable.
a2a_agent_name: Name of the A2A agent from agent card.
operation: The operation being attempted when error occurred.
context_id: A2A context ID grouping related tasks.
task_id: A2A task ID if applicable.
metadata: Custom A2A metadata key-value pairs.
"""
type: str = "a2a_connection_error"
endpoint: str
error: str
error_type: str | None = None
status_code: int | None = None
a2a_agent_name: str | None = None
operation: str | None = None
context_id: str | None = None
task_id: str | None = None
metadata: dict[str, Any] | None = None
class A2AServerTaskStartedEvent(A2AEventBase):
"""Event emitted when an A2A server task execution starts.
Attributes:
task_id: A2A task ID for this execution.
context_id: A2A context ID grouping related tasks.
metadata: Custom A2A metadata key-value pairs.
"""
"""Event emitted when an A2A server task execution starts."""
type: str = "a2a_server_task_started"
task_id: str
context_id: str
metadata: dict[str, Any] | None = None
a2a_task_id: str
a2a_context_id: str
class A2AServerTaskCompletedEvent(A2AEventBase):
"""Event emitted when an A2A server task execution completes.
Attributes:
task_id: A2A task ID for this execution.
context_id: A2A context ID grouping related tasks.
result: The task result.
metadata: Custom A2A metadata key-value pairs.
"""
"""Event emitted when an A2A server task execution completes."""
type: str = "a2a_server_task_completed"
task_id: str
context_id: str
a2a_task_id: str
a2a_context_id: str
result: str
metadata: dict[str, Any] | None = None
class A2AServerTaskCanceledEvent(A2AEventBase):
"""Event emitted when an A2A server task execution is canceled.
Attributes:
task_id: A2A task ID for this execution.
context_id: A2A context ID grouping related tasks.
metadata: Custom A2A metadata key-value pairs.
"""
"""Event emitted when an A2A server task execution is canceled."""
type: str = "a2a_server_task_canceled"
task_id: str
context_id: str
metadata: dict[str, Any] | None = None
a2a_task_id: str
a2a_context_id: str
class A2AServerTaskFailedEvent(A2AEventBase):
"""Event emitted when an A2A server task execution fails.
Attributes:
task_id: A2A task ID for this execution.
context_id: A2A context ID grouping related tasks.
error: Error message describing the failure.
metadata: Custom A2A metadata key-value pairs.
"""
"""Event emitted when an A2A server task execution fails."""
type: str = "a2a_server_task_failed"
task_id: str
context_id: str
a2a_task_id: str
a2a_context_id: str
error: str
metadata: dict[str, Any] | None = None
class A2AParallelDelegationStartedEvent(A2AEventBase):
"""Event emitted when parallel delegation to multiple A2A agents begins.
Attributes:
endpoints: List of A2A agent endpoints being delegated to.
task_description: Description of the task being delegated.
"""
type: str = "a2a_parallel_delegation_started"
endpoints: list[str]
task_description: str
class A2AParallelDelegationCompletedEvent(A2AEventBase):
"""Event emitted when parallel delegation to multiple A2A agents completes.
Attributes:
endpoints: List of A2A agent endpoints that were delegated to.
success_count: Number of successful delegations.
failure_count: Number of failed delegations.
results: Summary of results from each agent.
"""
type: str = "a2a_parallel_delegation_completed"
endpoints: list[str]
success_count: int
failure_count: int
results: dict[str, str] | None = None

View File

@@ -54,15 +54,21 @@ class GeminiCompletion(BaseLLM):
safety_settings: dict[str, Any] | None = None,
client_params: dict[str, Any] | None = None,
interceptor: BaseInterceptor[Any, Any] | None = None,
use_vertexai: bool | None = None,
**kwargs: Any,
):
"""Initialize Google Gemini chat completion client.
Args:
model: Gemini model name (e.g., 'gemini-2.0-flash-001', 'gemini-1.5-pro')
api_key: Google API key (defaults to GOOGLE_API_KEY or GEMINI_API_KEY env var)
project: Google Cloud project ID (for Vertex AI)
location: Google Cloud location (for Vertex AI, defaults to 'us-central1')
api_key: Google API key for Gemini API authentication.
Defaults to GOOGLE_API_KEY or GEMINI_API_KEY env var.
NOTE: Cannot be used with Vertex AI (project parameter). Use Gemini API instead.
project: Google Cloud project ID for Vertex AI with ADC authentication.
Requires Application Default Credentials (gcloud auth application-default login).
NOTE: Vertex AI does NOT support API keys, only OAuth2/ADC.
If both api_key and project are set, api_key takes precedence.
location: Google Cloud location (for Vertex AI with ADC, defaults to 'us-central1')
temperature: Sampling temperature (0-2)
top_p: Nucleus sampling parameter
top_k: Top-k sampling parameter
@@ -73,6 +79,12 @@ class GeminiCompletion(BaseLLM):
client_params: Additional parameters to pass to the Google Gen AI Client constructor.
Supports parameters like http_options, credentials, debug_config, etc.
interceptor: HTTP interceptor (not yet supported for Gemini).
use_vertexai: Whether to use Vertex AI instead of Gemini API.
- True: Use Vertex AI (with ADC or Express mode with API key)
- False: Use Gemini API (explicitly override env var)
- None (default): Check GOOGLE_GENAI_USE_VERTEXAI env var
When using Vertex AI with API key (Express mode), http_options with
api_version="v1" is automatically configured.
**kwargs: Additional parameters
"""
if interceptor is not None:
@@ -95,7 +107,8 @@ class GeminiCompletion(BaseLLM):
self.project = project or os.getenv("GOOGLE_CLOUD_PROJECT")
self.location = location or os.getenv("GOOGLE_CLOUD_LOCATION") or "us-central1"
use_vertexai = os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "").lower() == "true"
if use_vertexai is None:
use_vertexai = os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "").lower() == "true"
self.client = self._initialize_client(use_vertexai)
@@ -146,13 +159,34 @@ class GeminiCompletion(BaseLLM):
Returns:
Initialized Google Gen AI Client
Note:
Google Gen AI SDK has two distinct endpoints with different auth requirements:
- Gemini API (generativelanguage.googleapis.com): Supports API key authentication
- Vertex AI (aiplatform.googleapis.com): Only supports OAuth2/ADC, NO API keys
When vertexai=True is set, it routes to aiplatform.googleapis.com which rejects
API keys. Use Gemini API endpoint for API key authentication instead.
"""
client_params = {}
if self.client_params:
client_params.update(self.client_params)
if use_vertexai or self.project:
# Determine authentication mode based on available credentials
has_api_key = bool(self.api_key)
has_project = bool(self.project)
if has_api_key and has_project:
logging.warning(
"Both API key and project provided. Using API key authentication. "
"Project/location parameters are ignored when using API keys. "
"To use Vertex AI with ADC, remove the api_key parameter."
)
has_project = False
# Vertex AI with ADC (project without API key)
if (use_vertexai or has_project) and not has_api_key:
client_params.update(
{
"vertexai": True,
@@ -161,12 +195,20 @@ class GeminiCompletion(BaseLLM):
}
)
client_params.pop("api_key", None)
elif self.api_key:
# API key authentication (works with both Gemini API and Vertex AI Express)
elif has_api_key:
client_params["api_key"] = self.api_key
client_params.pop("vertexai", None)
# Vertex AI Express mode: API key + vertexai=True + http_options with api_version="v1"
# See: https://cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey
if use_vertexai:
client_params["vertexai"] = True
client_params["http_options"] = types.HttpOptions(api_version="v1")
else:
# This ensures we use the Gemini API (generativelanguage.googleapis.com)
client_params["vertexai"] = False
# Clean up project/location (not allowed with API key)
client_params.pop("project", None)
client_params.pop("location", None)
@@ -175,10 +217,13 @@ class GeminiCompletion(BaseLLM):
return genai.Client(**client_params)
except Exception as e:
raise ValueError(
"Either GOOGLE_API_KEY/GEMINI_API_KEY (for Gemini API) or "
"GOOGLE_CLOUD_PROJECT (for Vertex AI) must be set"
"Authentication required. Provide one of:\n"
" 1. API key via GOOGLE_API_KEY or GEMINI_API_KEY environment variable\n"
" (use_vertexai=True is optional for Vertex AI with API key)\n"
" 2. For Vertex AI with ADC: Set GOOGLE_CLOUD_PROJECT and run:\n"
" gcloud auth application-default login\n"
" 3. Pass api_key parameter directly to LLM constructor\n"
) from e
return genai.Client(**client_params)
def _get_client_params(self) -> dict[str, Any]:
@@ -202,6 +247,8 @@ class GeminiCompletion(BaseLLM):
"location": self.location,
}
)
if self.api_key:
params["api_key"] = self.api_key
elif self.api_key:
params["api_key"] = self.api_key

View File

@@ -26,13 +26,9 @@ def mock_agent() -> MagicMock:
@pytest.fixture
def mock_task(mock_context: MagicMock) -> MagicMock:
def mock_task() -> MagicMock:
"""Create a mock Task."""
task = MagicMock()
task.id = mock_context.task_id
task.name = "Mock Task"
task.description = "Mock task description"
return task
return MagicMock()
@pytest.fixture
@@ -183,8 +179,8 @@ class TestExecute:
event = first_call[0][1]
assert event.type == "a2a_server_task_started"
assert event.task_id == mock_context.task_id
assert event.context_id == mock_context.context_id
assert event.a2a_task_id == mock_context.task_id
assert event.a2a_context_id == mock_context.context_id
@pytest.mark.asyncio
async def test_emits_completed_event(
@@ -205,7 +201,7 @@ class TestExecute:
event = second_call[0][1]
assert event.type == "a2a_server_task_completed"
assert event.task_id == mock_context.task_id
assert event.a2a_task_id == mock_context.task_id
assert event.result == "Task completed successfully"
@pytest.mark.asyncio
@@ -254,7 +250,7 @@ class TestExecute:
event = canceled_call[0][1]
assert event.type == "a2a_server_task_canceled"
assert event.task_id == mock_context.task_id
assert event.a2a_task_id == mock_context.task_id
class TestCancel:

View File

@@ -14,16 +14,6 @@ except ImportError:
A2A_SDK_INSTALLED = False
def _create_mock_agent_card(name: str = "Test", url: str = "http://test-endpoint.com/"):
"""Create a mock agent card with proper model_dump behavior."""
mock_card = MagicMock()
mock_card.name = name
mock_card.url = url
mock_card.model_dump.return_value = {"name": name, "url": url}
mock_card.model_dump_json.return_value = f'{{"name": "{name}", "url": "{url}"}}'
return mock_card
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_trust_remote_completion_status_true_returns_directly():
"""When trust_remote_completion_status=True and A2A returns completed, return result directly."""
@@ -54,7 +44,8 @@ def test_trust_remote_completion_status_true_returns_directly():
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = _create_mock_agent_card()
mock_card = MagicMock()
mock_card.name = "Test"
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
# A2A returns completed
@@ -119,7 +110,8 @@ def test_trust_remote_completion_status_false_continues_conversation():
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = _create_mock_agent_card()
mock_card = MagicMock()
mock_card.name = "Test"
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
# A2A returns completed

View File

@@ -0,0 +1,75 @@
interactions:
- request:
body: '{"contents": [{"parts": [{"text": "\nCurrent Task: What is the capital
of Japan?\n\nThis is the expected criteria for your final answer: The capital
of Japan\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "role":
"user"}], "systemInstruction": {"parts": [{"text": "You are Research Assistant.
You are a helpful research assistant.\nYour personal goal is: Find information
about the capital of Japan\nTo give my best complete final answer to the task
respond using the exact following format:\n\nThought: I now can give a great
answer\nFinal Answer: Your final answer must be the great and the most complete
as possible, it must be outcome described.\n\nI MUST use these formats, my job
depends on it!"}], "role": "user"}, "generationConfig": {"stopSequences": ["\nObservation:"]}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- '*/*'
accept-encoding:
- ACCEPT-ENCODING-XXX
connection:
- keep-alive
content-length:
- '952'
content-type:
- application/json
host:
- aiplatform.googleapis.com
x-goog-api-client:
- google-genai-sdk/1.59.0 gl-python/3.13.3
x-goog-api-key:
- X-GOOG-API-KEY-XXX
method: POST
uri: https://aiplatform.googleapis.com/v1/publishers/google/models/gemini-2.0-flash-exp:generateContent
response:
body:
string: "{\n \"candidates\": [\n {\n \"content\": {\n \"role\":
\"model\",\n \"parts\": [\n {\n \"text\": \"The
capital of Japan is Tokyo.\\nFinal Answer: Tokyo\\n\"\n }\n ]\n
\ },\n \"finishReason\": \"STOP\",\n \"avgLogprobs\": -0.017845841554495003\n
\ }\n ],\n \"usageMetadata\": {\n \"promptTokenCount\": 163,\n \"candidatesTokenCount\":
13,\n \"totalTokenCount\": 176,\n \"trafficType\": \"ON_DEMAND\",\n
\ \"promptTokensDetails\": [\n {\n \"modality\": \"TEXT\",\n
\ \"tokenCount\": 163\n }\n ],\n \"candidatesTokensDetails\":
[\n {\n \"modality\": \"TEXT\",\n \"tokenCount\": 13\n
\ }\n ]\n },\n \"modelVersion\": \"gemini-2.0-flash-exp\",\n \"createTime\":
\"2026-01-15T22:27:38.066749Z\",\n \"responseId\": \"2mlpab2JBNOFidsPh5GigQs\"\n}\n"
headers:
Alt-Svc:
- h3=":443"; ma=2592000,h3-29=":443"; ma=2592000
Content-Type:
- application/json; charset=UTF-8
Date:
- Thu, 15 Jan 2026 22:27:38 GMT
Server:
- scaffolding on HTTPServer2
Transfer-Encoding:
- chunked
Vary:
- Origin
- X-Origin
- Referer
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
X-Frame-Options:
- X-FRAME-OPTIONS-XXX
X-XSS-Protection:
- '0'
content-length:
- '786'
status:
code: 200
message: OK
version: 1

View File

@@ -728,3 +728,39 @@ def test_google_streaming_returns_usage_metrics():
assert result.token_usage.prompt_tokens > 0
assert result.token_usage.completion_tokens > 0
assert result.token_usage.successful_requests >= 1
@pytest.mark.vcr()
def test_google_express_mode_works() -> None:
"""
Test Google Vertex AI Express mode with API key authentication.
This tests Vertex AI Express mode (aiplatform.googleapis.com) with API key
authentication.
"""
with patch.dict(os.environ, {"GOOGLE_GENAI_USE_VERTEXAI": "true"}):
agent = Agent(
role="Research Assistant",
goal="Find information about the capital of Japan",
backstory="You are a helpful research assistant.",
llm=LLM(
model="gemini/gemini-2.0-flash-exp",
),
verbose=True,
)
task = Task(
description="What is the capital of Japan?",
expected_output="The capital of Japan",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result.token_usage is not None
assert result.token_usage.total_tokens > 0
assert result.token_usage.prompt_tokens > 0
assert result.token_usage.completion_tokens > 0
assert result.token_usage.successful_requests >= 1