mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-26 16:48:13 +00:00
Compare commits
10 Commits
devin/1768
...
1.8.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f022be106 | ||
|
|
6a19b0a279 | ||
|
|
641c336b2c | ||
|
|
22f1812824 | ||
|
|
9edbf89b68 | ||
|
|
685f7b9af1 | ||
|
|
595fdfb6e7 | ||
|
|
8f99fa76ed | ||
|
|
17e3fcbe1f | ||
|
|
b858d705a8 |
@@ -291,6 +291,7 @@
|
|||||||
"en/observability/arize-phoenix",
|
"en/observability/arize-phoenix",
|
||||||
"en/observability/braintrust",
|
"en/observability/braintrust",
|
||||||
"en/observability/datadog",
|
"en/observability/datadog",
|
||||||
|
"en/observability/galileo",
|
||||||
"en/observability/langdb",
|
"en/observability/langdb",
|
||||||
"en/observability/langfuse",
|
"en/observability/langfuse",
|
||||||
"en/observability/langtrace",
|
"en/observability/langtrace",
|
||||||
@@ -742,6 +743,7 @@
|
|||||||
"pt-BR/observability/arize-phoenix",
|
"pt-BR/observability/arize-phoenix",
|
||||||
"pt-BR/observability/braintrust",
|
"pt-BR/observability/braintrust",
|
||||||
"pt-BR/observability/datadog",
|
"pt-BR/observability/datadog",
|
||||||
|
"pt-BR/observability/galileo",
|
||||||
"pt-BR/observability/langdb",
|
"pt-BR/observability/langdb",
|
||||||
"pt-BR/observability/langfuse",
|
"pt-BR/observability/langfuse",
|
||||||
"pt-BR/observability/langtrace",
|
"pt-BR/observability/langtrace",
|
||||||
@@ -1203,6 +1205,7 @@
|
|||||||
"ko/observability/arize-phoenix",
|
"ko/observability/arize-phoenix",
|
||||||
"ko/observability/braintrust",
|
"ko/observability/braintrust",
|
||||||
"ko/observability/datadog",
|
"ko/observability/datadog",
|
||||||
|
"ko/observability/galileo",
|
||||||
"ko/observability/langdb",
|
"ko/observability/langdb",
|
||||||
"ko/observability/langfuse",
|
"ko/observability/langfuse",
|
||||||
"ko/observability/langtrace",
|
"ko/observability/langtrace",
|
||||||
|
|||||||
@@ -574,6 +574,10 @@ When you run this Flow, the output will change based on the random boolean value
|
|||||||
|
|
||||||
### Human in the Loop (human feedback)
|
### Human in the Loop (human feedback)
|
||||||
|
|
||||||
|
<Note>
|
||||||
|
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**.
|
||||||
|
</Note>
|
||||||
|
|
||||||
The `@human_feedback` decorator enables human-in-the-loop workflows by pausing flow execution to collect feedback from a human. This is useful for approval gates, quality review, and decision points that require human judgment.
|
The `@human_feedback` decorator enables human-in-the-loop workflows by pausing flow execution to collect feedback from a human. This is useful for approval gates, quality review, and decision points that require human judgment.
|
||||||
|
|
||||||
```python Code
|
```python Code
|
||||||
|
|||||||
@@ -1,43 +1,48 @@
|
|||||||
---
|
---
|
||||||
title: Agent-to-Agent (A2A) Protocol
|
title: Agent-to-Agent (A2A) Protocol
|
||||||
description: Enable CrewAI agents to delegate tasks to remote A2A-compliant agents for specialized handling
|
description: Agents delegate tasks to remote A2A agents and/or operate as A2A-compliant server agents.
|
||||||
icon: network-wired
|
icon: network-wired
|
||||||
mode: "wide"
|
mode: "wide"
|
||||||
---
|
---
|
||||||
|
|
||||||
## A2A Agent Delegation
|
## A2A Agent Delegation
|
||||||
|
|
||||||
CrewAI supports the Agent-to-Agent (A2A) protocol, allowing agents to delegate tasks to remote specialized agents. The agent's LLM automatically decides whether to handle a task directly or delegate to an A2A agent based on the task requirements.
|
CrewAI treats [A2A protocol](https://a2a-protocol.org/latest/) as a first-class delegation primitive, enabling agents to delegate tasks, request information, and collaborate with remote agents, as well as act as A2A-compliant server agents.
|
||||||
|
In client mode, agents autonomously choose between local execution and remote delegation based on task requirements.
|
||||||
<Note>
|
|
||||||
A2A delegation requires the `a2a-sdk` package. Install with: `uv add 'crewai[a2a]'` or `pip install 'crewai[a2a]'`
|
|
||||||
</Note>
|
|
||||||
|
|
||||||
## How It Works
|
## How It Works
|
||||||
|
|
||||||
When an agent is configured with A2A capabilities:
|
When an agent is configured with A2A capabilities:
|
||||||
|
|
||||||
1. The LLM analyzes each task
|
1. The Agent analyzes each task
|
||||||
2. It decides to either:
|
2. It decides to either:
|
||||||
- Handle the task directly using its own capabilities
|
- Handle the task directly using its own capabilities
|
||||||
- Delegate to a remote A2A agent for specialized handling
|
- Delegate to a remote A2A agent for specialized handling
|
||||||
3. If delegating, the agent communicates with the remote A2A agent through the protocol
|
3. If delegating, the agent communicates with the remote A2A agent through the protocol
|
||||||
4. Results are returned to the CrewAI workflow
|
4. Results are returned to the CrewAI workflow
|
||||||
|
|
||||||
|
<Note>
|
||||||
|
A2A delegation requires the `a2a-sdk` package. Install with: `uv add 'crewai[a2a]'` or `pip install 'crewai[a2a]'`
|
||||||
|
</Note>
|
||||||
|
|
||||||
## Basic Configuration
|
## Basic Configuration
|
||||||
|
|
||||||
|
<Warning>
|
||||||
|
`crewai.a2a.config.A2AConfig` is deprecated and will be removed in v2.0.0. Use `A2AClientConfig` for connecting to remote agents and/or `A2AServerConfig` for exposing agents as servers.
|
||||||
|
</Warning>
|
||||||
|
|
||||||
Configure an agent for A2A delegation by setting the `a2a` parameter:
|
Configure an agent for A2A delegation by setting the `a2a` parameter:
|
||||||
|
|
||||||
```python Code
|
```python Code
|
||||||
from crewai import Agent, Crew, Task
|
from crewai import Agent, Crew, Task
|
||||||
from crewai.a2a import A2AConfig
|
from crewai.a2a import A2AClientConfig
|
||||||
|
|
||||||
agent = Agent(
|
agent = Agent(
|
||||||
role="Research Coordinator",
|
role="Research Coordinator",
|
||||||
goal="Coordinate research tasks efficiently",
|
goal="Coordinate research tasks efficiently",
|
||||||
backstory="Expert at delegating to specialized research agents",
|
backstory="Expert at delegating to specialized research agents",
|
||||||
llm="gpt-4o",
|
llm="gpt-4o",
|
||||||
a2a=A2AConfig(
|
a2a=A2AClientConfig(
|
||||||
endpoint="https://example.com/.well-known/agent-card.json",
|
endpoint="https://example.com/.well-known/agent-card.json",
|
||||||
timeout=120,
|
timeout=120,
|
||||||
max_turns=10
|
max_turns=10
|
||||||
@@ -54,9 +59,9 @@ crew = Crew(agents=[agent], tasks=[task], verbose=True)
|
|||||||
result = crew.kickoff()
|
result = crew.kickoff()
|
||||||
```
|
```
|
||||||
|
|
||||||
## Configuration Options
|
## Client Configuration Options
|
||||||
|
|
||||||
The `A2AConfig` class accepts the following parameters:
|
The `A2AClientConfig` class accepts the following parameters:
|
||||||
|
|
||||||
<ParamField path="endpoint" type="str" required>
|
<ParamField path="endpoint" type="str" required>
|
||||||
The A2A agent endpoint URL (typically points to `.well-known/agent-card.json`)
|
The A2A agent endpoint URL (typically points to `.well-known/agent-card.json`)
|
||||||
@@ -91,14 +96,34 @@ The `A2AConfig` class accepts the following parameters:
|
|||||||
Update mechanism for receiving task status. Options: `StreamingConfig`, `PollingConfig`, or `PushNotificationConfig`.
|
Update mechanism for receiving task status. Options: `StreamingConfig`, `PollingConfig`, or `PushNotificationConfig`.
|
||||||
</ParamField>
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="transport_protocol" type="Literal['JSONRPC', 'GRPC', 'HTTP+JSON']" default="JSONRPC">
|
||||||
|
Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="accepted_output_modes" type="list[str]" default='["application/json"]'>
|
||||||
|
Media types the client can accept in responses.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="supported_transports" type="list[str]" default='["JSONRPC"]'>
|
||||||
|
Ordered list of transport protocols the client supports.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="use_client_preference" type="bool" default="False">
|
||||||
|
Whether to prioritize client transport preferences over server.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="extensions" type="list[str]" default="[]">
|
||||||
|
Extension URIs the client supports.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
## Authentication
|
## Authentication
|
||||||
|
|
||||||
For A2A agents that require authentication, use one of the provided auth schemes:
|
For A2A agents that require authentication, use one of the provided auth schemes:
|
||||||
|
|
||||||
<Tabs>
|
<Tabs>
|
||||||
<Tab title="Bearer Token">
|
<Tab title="Bearer Token">
|
||||||
```python Code
|
```python bearer_token_auth.py lines
|
||||||
from crewai.a2a import A2AConfig
|
from crewai.a2a import A2AClientConfig
|
||||||
from crewai.a2a.auth import BearerTokenAuth
|
from crewai.a2a.auth import BearerTokenAuth
|
||||||
|
|
||||||
agent = Agent(
|
agent = Agent(
|
||||||
@@ -106,18 +131,18 @@ agent = Agent(
|
|||||||
goal="Coordinate tasks with secured agents",
|
goal="Coordinate tasks with secured agents",
|
||||||
backstory="Manages secure agent communications",
|
backstory="Manages secure agent communications",
|
||||||
llm="gpt-4o",
|
llm="gpt-4o",
|
||||||
a2a=A2AConfig(
|
a2a=A2AClientConfig(
|
||||||
endpoint="https://secure-agent.example.com/.well-known/agent-card.json",
|
endpoint="https://secure-agent.example.com/.well-known/agent-card.json",
|
||||||
auth=BearerTokenAuth(token="your-bearer-token"),
|
auth=BearerTokenAuth(token="your-bearer-token"),
|
||||||
timeout=120
|
timeout=120
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
</Tab>
|
</Tab>
|
||||||
|
|
||||||
<Tab title="API Key">
|
<Tab title="API Key">
|
||||||
```python Code
|
```python api_key_auth.py lines
|
||||||
from crewai.a2a import A2AConfig
|
from crewai.a2a import A2AClientConfig
|
||||||
from crewai.a2a.auth import APIKeyAuth
|
from crewai.a2a.auth import APIKeyAuth
|
||||||
|
|
||||||
agent = Agent(
|
agent = Agent(
|
||||||
@@ -125,7 +150,7 @@ agent = Agent(
|
|||||||
goal="Coordinate with API-based agents",
|
goal="Coordinate with API-based agents",
|
||||||
backstory="Manages API-authenticated communications",
|
backstory="Manages API-authenticated communications",
|
||||||
llm="gpt-4o",
|
llm="gpt-4o",
|
||||||
a2a=A2AConfig(
|
a2a=A2AClientConfig(
|
||||||
endpoint="https://api-agent.example.com/.well-known/agent-card.json",
|
endpoint="https://api-agent.example.com/.well-known/agent-card.json",
|
||||||
auth=APIKeyAuth(
|
auth=APIKeyAuth(
|
||||||
api_key="your-api-key",
|
api_key="your-api-key",
|
||||||
@@ -135,12 +160,12 @@ agent = Agent(
|
|||||||
timeout=120
|
timeout=120
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
</Tab>
|
</Tab>
|
||||||
|
|
||||||
<Tab title="OAuth2">
|
<Tab title="OAuth2">
|
||||||
```python Code
|
```python oauth2_auth.py lines
|
||||||
from crewai.a2a import A2AConfig
|
from crewai.a2a import A2AClientConfig
|
||||||
from crewai.a2a.auth import OAuth2ClientCredentials
|
from crewai.a2a.auth import OAuth2ClientCredentials
|
||||||
|
|
||||||
agent = Agent(
|
agent = Agent(
|
||||||
@@ -148,7 +173,7 @@ agent = Agent(
|
|||||||
goal="Coordinate with OAuth-secured agents",
|
goal="Coordinate with OAuth-secured agents",
|
||||||
backstory="Manages OAuth-authenticated communications",
|
backstory="Manages OAuth-authenticated communications",
|
||||||
llm="gpt-4o",
|
llm="gpt-4o",
|
||||||
a2a=A2AConfig(
|
a2a=A2AClientConfig(
|
||||||
endpoint="https://oauth-agent.example.com/.well-known/agent-card.json",
|
endpoint="https://oauth-agent.example.com/.well-known/agent-card.json",
|
||||||
auth=OAuth2ClientCredentials(
|
auth=OAuth2ClientCredentials(
|
||||||
token_url="https://auth.example.com/oauth/token",
|
token_url="https://auth.example.com/oauth/token",
|
||||||
@@ -159,12 +184,12 @@ agent = Agent(
|
|||||||
timeout=120
|
timeout=120
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
</Tab>
|
</Tab>
|
||||||
|
|
||||||
<Tab title="HTTP Basic">
|
<Tab title="HTTP Basic">
|
||||||
```python Code
|
```python http_basic_auth.py lines
|
||||||
from crewai.a2a import A2AConfig
|
from crewai.a2a import A2AClientConfig
|
||||||
from crewai.a2a.auth import HTTPBasicAuth
|
from crewai.a2a.auth import HTTPBasicAuth
|
||||||
|
|
||||||
agent = Agent(
|
agent = Agent(
|
||||||
@@ -172,7 +197,7 @@ agent = Agent(
|
|||||||
goal="Coordinate with basic auth agents",
|
goal="Coordinate with basic auth agents",
|
||||||
backstory="Manages basic authentication communications",
|
backstory="Manages basic authentication communications",
|
||||||
llm="gpt-4o",
|
llm="gpt-4o",
|
||||||
a2a=A2AConfig(
|
a2a=A2AClientConfig(
|
||||||
endpoint="https://basic-agent.example.com/.well-known/agent-card.json",
|
endpoint="https://basic-agent.example.com/.well-known/agent-card.json",
|
||||||
auth=HTTPBasicAuth(
|
auth=HTTPBasicAuth(
|
||||||
username="your-username",
|
username="your-username",
|
||||||
@@ -181,7 +206,7 @@ agent = Agent(
|
|||||||
timeout=120
|
timeout=120
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
</Tab>
|
</Tab>
|
||||||
</Tabs>
|
</Tabs>
|
||||||
|
|
||||||
@@ -190,7 +215,7 @@ agent = Agent(
|
|||||||
Configure multiple A2A agents for delegation by passing a list:
|
Configure multiple A2A agents for delegation by passing a list:
|
||||||
|
|
||||||
```python Code
|
```python Code
|
||||||
from crewai.a2a import A2AConfig
|
from crewai.a2a import A2AClientConfig
|
||||||
from crewai.a2a.auth import BearerTokenAuth
|
from crewai.a2a.auth import BearerTokenAuth
|
||||||
|
|
||||||
agent = Agent(
|
agent = Agent(
|
||||||
@@ -199,11 +224,11 @@ agent = Agent(
|
|||||||
backstory="Expert at delegating to the right specialist",
|
backstory="Expert at delegating to the right specialist",
|
||||||
llm="gpt-4o",
|
llm="gpt-4o",
|
||||||
a2a=[
|
a2a=[
|
||||||
A2AConfig(
|
A2AClientConfig(
|
||||||
endpoint="https://research.example.com/.well-known/agent-card.json",
|
endpoint="https://research.example.com/.well-known/agent-card.json",
|
||||||
timeout=120
|
timeout=120
|
||||||
),
|
),
|
||||||
A2AConfig(
|
A2AClientConfig(
|
||||||
endpoint="https://data.example.com/.well-known/agent-card.json",
|
endpoint="https://data.example.com/.well-known/agent-card.json",
|
||||||
auth=BearerTokenAuth(token="data-token"),
|
auth=BearerTokenAuth(token="data-token"),
|
||||||
timeout=90
|
timeout=90
|
||||||
@@ -219,7 +244,7 @@ The LLM will automatically choose which A2A agent to delegate to based on the ta
|
|||||||
Control how agent connection failures are handled using the `fail_fast` parameter:
|
Control how agent connection failures are handled using the `fail_fast` parameter:
|
||||||
|
|
||||||
```python Code
|
```python Code
|
||||||
from crewai.a2a import A2AConfig
|
from crewai.a2a import A2AClientConfig
|
||||||
|
|
||||||
# Fail immediately on connection errors (default)
|
# Fail immediately on connection errors (default)
|
||||||
agent = Agent(
|
agent = Agent(
|
||||||
@@ -227,7 +252,7 @@ agent = Agent(
|
|||||||
goal="Coordinate research tasks",
|
goal="Coordinate research tasks",
|
||||||
backstory="Expert at delegation",
|
backstory="Expert at delegation",
|
||||||
llm="gpt-4o",
|
llm="gpt-4o",
|
||||||
a2a=A2AConfig(
|
a2a=A2AClientConfig(
|
||||||
endpoint="https://research.example.com/.well-known/agent-card.json",
|
endpoint="https://research.example.com/.well-known/agent-card.json",
|
||||||
fail_fast=True
|
fail_fast=True
|
||||||
)
|
)
|
||||||
@@ -240,11 +265,11 @@ agent = Agent(
|
|||||||
backstory="Expert at working with available resources",
|
backstory="Expert at working with available resources",
|
||||||
llm="gpt-4o",
|
llm="gpt-4o",
|
||||||
a2a=[
|
a2a=[
|
||||||
A2AConfig(
|
A2AClientConfig(
|
||||||
endpoint="https://primary.example.com/.well-known/agent-card.json",
|
endpoint="https://primary.example.com/.well-known/agent-card.json",
|
||||||
fail_fast=False
|
fail_fast=False
|
||||||
),
|
),
|
||||||
A2AConfig(
|
A2AClientConfig(
|
||||||
endpoint="https://backup.example.com/.well-known/agent-card.json",
|
endpoint="https://backup.example.com/.well-known/agent-card.json",
|
||||||
fail_fast=False
|
fail_fast=False
|
||||||
)
|
)
|
||||||
@@ -263,8 +288,8 @@ Control how your agent receives task status updates from remote A2A agents:
|
|||||||
|
|
||||||
<Tabs>
|
<Tabs>
|
||||||
<Tab title="Streaming (Default)">
|
<Tab title="Streaming (Default)">
|
||||||
```python Code
|
```python streaming_config.py lines
|
||||||
from crewai.a2a import A2AConfig
|
from crewai.a2a import A2AClientConfig
|
||||||
from crewai.a2a.updates import StreamingConfig
|
from crewai.a2a.updates import StreamingConfig
|
||||||
|
|
||||||
agent = Agent(
|
agent = Agent(
|
||||||
@@ -272,17 +297,17 @@ agent = Agent(
|
|||||||
goal="Coordinate research tasks",
|
goal="Coordinate research tasks",
|
||||||
backstory="Expert at delegation",
|
backstory="Expert at delegation",
|
||||||
llm="gpt-4o",
|
llm="gpt-4o",
|
||||||
a2a=A2AConfig(
|
a2a=A2AClientConfig(
|
||||||
endpoint="https://research.example.com/.well-known/agent-card.json",
|
endpoint="https://research.example.com/.well-known/agent-card.json",
|
||||||
updates=StreamingConfig()
|
updates=StreamingConfig()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
</Tab>
|
</Tab>
|
||||||
|
|
||||||
<Tab title="Polling">
|
<Tab title="Polling">
|
||||||
```python Code
|
```python polling_config.py lines
|
||||||
from crewai.a2a import A2AConfig
|
from crewai.a2a import A2AClientConfig
|
||||||
from crewai.a2a.updates import PollingConfig
|
from crewai.a2a.updates import PollingConfig
|
||||||
|
|
||||||
agent = Agent(
|
agent = Agent(
|
||||||
@@ -290,7 +315,7 @@ agent = Agent(
|
|||||||
goal="Coordinate research tasks",
|
goal="Coordinate research tasks",
|
||||||
backstory="Expert at delegation",
|
backstory="Expert at delegation",
|
||||||
llm="gpt-4o",
|
llm="gpt-4o",
|
||||||
a2a=A2AConfig(
|
a2a=A2AClientConfig(
|
||||||
endpoint="https://research.example.com/.well-known/agent-card.json",
|
endpoint="https://research.example.com/.well-known/agent-card.json",
|
||||||
updates=PollingConfig(
|
updates=PollingConfig(
|
||||||
interval=2.0,
|
interval=2.0,
|
||||||
@@ -299,12 +324,12 @@ agent = Agent(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
</Tab>
|
</Tab>
|
||||||
|
|
||||||
<Tab title="Push Notifications">
|
<Tab title="Push Notifications">
|
||||||
```python Code
|
```python push_notifications_config.py lines
|
||||||
from crewai.a2a import A2AConfig
|
from crewai.a2a import A2AClientConfig
|
||||||
from crewai.a2a.updates import PushNotificationConfig
|
from crewai.a2a.updates import PushNotificationConfig
|
||||||
|
|
||||||
agent = Agent(
|
agent = Agent(
|
||||||
@@ -312,19 +337,137 @@ agent = Agent(
|
|||||||
goal="Coordinate research tasks",
|
goal="Coordinate research tasks",
|
||||||
backstory="Expert at delegation",
|
backstory="Expert at delegation",
|
||||||
llm="gpt-4o",
|
llm="gpt-4o",
|
||||||
a2a=A2AConfig(
|
a2a=A2AClientConfig(
|
||||||
endpoint="https://research.example.com/.well-known/agent-card.json",
|
endpoint="https://research.example.com/.well-known/agent-card.json",
|
||||||
updates=PushNotificationConfig(
|
updates=PushNotificationConfig(
|
||||||
url={base_url}/a2a/callback",
|
url="{base_url}/a2a/callback",
|
||||||
token="your-validation-token",
|
token="your-validation-token",
|
||||||
timeout=300.0
|
timeout=300.0
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
</Tab>
|
</Tab>
|
||||||
</Tabs>
|
</Tabs>
|
||||||
|
|
||||||
|
## Exposing Agents as A2A Servers
|
||||||
|
|
||||||
|
You can expose your CrewAI agents as A2A-compliant servers, allowing other A2A clients to delegate tasks to them.
|
||||||
|
|
||||||
|
### Server Configuration
|
||||||
|
|
||||||
|
Add an `A2AServerConfig` to your agent to enable server capabilities:
|
||||||
|
|
||||||
|
```python a2a_server_agent.py lines
|
||||||
|
from crewai import Agent
|
||||||
|
from crewai.a2a import A2AServerConfig
|
||||||
|
|
||||||
|
agent = Agent(
|
||||||
|
role="Data Analyst",
|
||||||
|
goal="Analyze datasets and provide insights",
|
||||||
|
backstory="Expert data scientist with statistical analysis skills",
|
||||||
|
llm="gpt-4o",
|
||||||
|
a2a=A2AServerConfig(url="https://your-server.com")
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Server Configuration Options
|
||||||
|
|
||||||
|
<ParamField path="name" type="str" default="None">
|
||||||
|
Human-readable name for the agent. Defaults to the agent's role if not provided.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="description" type="str" default="None">
|
||||||
|
Human-readable description. Defaults to the agent's goal and backstory if not provided.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="version" type="str" default="1.0.0">
|
||||||
|
Version string for the agent card.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="skills" type="list[AgentSkill]" default="[]">
|
||||||
|
List of agent skills. Auto-generated from agent tools if not provided.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="capabilities" type="AgentCapabilities" default="AgentCapabilities(streaming=True, push_notifications=False)">
|
||||||
|
Declaration of optional capabilities supported by the agent.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="default_input_modes" type="list[str]" default='["text/plain", "application/json"]'>
|
||||||
|
Supported input MIME types.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="default_output_modes" type="list[str]" default='["text/plain", "application/json"]'>
|
||||||
|
Supported output MIME types.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="url" type="str" default="None">
|
||||||
|
Preferred endpoint URL. If set, overrides the URL passed to `to_agent_card()`.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="preferred_transport" type="Literal['JSONRPC', 'GRPC', 'HTTP+JSON']" default="JSONRPC">
|
||||||
|
Transport protocol for the preferred endpoint.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="protocol_version" type="str" default="0.3">
|
||||||
|
A2A protocol version this agent supports.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="provider" type="AgentProvider" default="None">
|
||||||
|
Information about the agent's service provider.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="documentation_url" type="str" default="None">
|
||||||
|
URL to the agent's documentation.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="icon_url" type="str" default="None">
|
||||||
|
URL to an icon for the agent.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="additional_interfaces" type="list[AgentInterface]" default="[]">
|
||||||
|
Additional supported interfaces (transport and URL combinations).
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="security" type="list[dict[str, list[str]]]" default="[]">
|
||||||
|
Security requirement objects for all agent interactions.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="security_schemes" type="dict[str, SecurityScheme]" default="{}">
|
||||||
|
Security schemes available to authorize requests.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="supports_authenticated_extended_card" type="bool" default="False">
|
||||||
|
Whether agent provides extended card to authenticated users.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
<ParamField path="signatures" type="list[AgentCardSignature]" default="[]">
|
||||||
|
JSON Web Signatures for the AgentCard.
|
||||||
|
</ParamField>
|
||||||
|
|
||||||
|
### Combined Client and Server
|
||||||
|
|
||||||
|
An agent can act as both client and server by providing both configurations:
|
||||||
|
|
||||||
|
```python Code
|
||||||
|
from crewai import Agent
|
||||||
|
from crewai.a2a import A2AClientConfig, A2AServerConfig
|
||||||
|
|
||||||
|
agent = Agent(
|
||||||
|
role="Research Coordinator",
|
||||||
|
goal="Coordinate research and serve analysis requests",
|
||||||
|
backstory="Expert at delegation and analysis",
|
||||||
|
llm="gpt-4o",
|
||||||
|
a2a=[
|
||||||
|
A2AClientConfig(
|
||||||
|
endpoint="https://specialist.example.com/.well-known/agent-card.json",
|
||||||
|
timeout=120
|
||||||
|
),
|
||||||
|
A2AServerConfig(url="https://your-server.com")
|
||||||
|
]
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
## Best Practices
|
## Best Practices
|
||||||
|
|
||||||
<CardGroup cols={2}>
|
<CardGroup cols={2}>
|
||||||
|
|||||||
@@ -7,6 +7,10 @@ mode: "wide"
|
|||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
|
<Note>
|
||||||
|
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**. Make sure to update your installation before using this feature.
|
||||||
|
</Note>
|
||||||
|
|
||||||
The `@human_feedback` decorator enables human-in-the-loop (HITL) workflows directly within CrewAI Flows. It allows you to pause flow execution, present output to a human for review, collect their feedback, and optionally route to different listeners based on the feedback outcome.
|
The `@human_feedback` decorator enables human-in-the-loop (HITL) workflows directly within CrewAI Flows. It allows you to pause flow execution, present output to a human for review, collect their feedback, and optionally route to different listeners based on the feedback outcome.
|
||||||
|
|
||||||
This is particularly valuable for:
|
This is particularly valuable for:
|
||||||
|
|||||||
@@ -11,10 +11,10 @@ Human-in-the-Loop (HITL) is a powerful approach that combines artificial intelli
|
|||||||
|
|
||||||
CrewAI offers two main approaches for implementing human-in-the-loop workflows:
|
CrewAI offers two main approaches for implementing human-in-the-loop workflows:
|
||||||
|
|
||||||
| Approach | Best For | Integration |
|
| Approach | Best For | Integration | Version |
|
||||||
|----------|----------|-------------|
|
|----------|----------|-------------|---------|
|
||||||
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) |
|
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) | **1.8.0+** |
|
||||||
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide |
|
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide | - |
|
||||||
|
|
||||||
<Tip>
|
<Tip>
|
||||||
If you're building flows and want to add human review steps with routing based on feedback, check out the [Human Feedback in Flows](/en/learn/human-feedback-in-flows) guide for the `@human_feedback` decorator.
|
If you're building flows and want to add human review steps with routing based on feedback, check out the [Human Feedback in Flows](/en/learn/human-feedback-in-flows) guide for the `@human_feedback` decorator.
|
||||||
|
|||||||
115
docs/en/observability/galileo.mdx
Normal file
115
docs/en/observability/galileo.mdx
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
---
|
||||||
|
title: Galileo
|
||||||
|
description: Galileo integration for CrewAI tracing and evaluation
|
||||||
|
icon: telescope
|
||||||
|
mode: "wide"
|
||||||
|
---
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
This guide demonstrates how to integrate **Galileo** with **CrewAI**
|
||||||
|
for comprehensive tracing and Evaluation Engineering.
|
||||||
|
By the end of this guide, you will be able to trace your CrewAI agents,
|
||||||
|
monitor their performance, and evaluate their behaviour with
|
||||||
|
Galileo's powerful observability platform.
|
||||||
|
|
||||||
|
> **What is Galileo?** [Galileo](https://galileo.ai) is AI evaluation and observability
|
||||||
|
platform that delivers end-to-end tracing, evaluation,
|
||||||
|
and monitoring for AI applications. It enables teams to capture ground truth,
|
||||||
|
create robust guardrails, and run systematic experiments with
|
||||||
|
built-in experiment tracking and performance analytics—ensuring reliability,
|
||||||
|
transparency, and continuous improvement across the AI lifecycle.
|
||||||
|
|
||||||
|
## Getting started
|
||||||
|
|
||||||
|
This tutorial follows the [CrewAI quickstart](/en/quickstart) and shows how to add
|
||||||
|
Galileo's [CrewAIEventListener](https://v2docs.galileo.ai/sdk-api/python/reference/handlers/crewai/handler),
|
||||||
|
an event handler.
|
||||||
|
For more information, see Galileo’s
|
||||||
|
[Add Galileo to a CrewAI Application](https://v2docs.galileo.ai/how-to-guides/third-party-integrations/add-galileo-to-crewai/add-galileo-to-crewai)
|
||||||
|
how-to guide.
|
||||||
|
|
||||||
|
> **Note** This tutorial assumes you have completed the [CrewAI quickstart](/en/quickstart).
|
||||||
|
If you want a completed comprehensive example, see the Galileo
|
||||||
|
[CrewAI sdk-example repo](https://github.com/rungalileo/sdk-examples/tree/main/python/agent/crew-ai).
|
||||||
|
|
||||||
|
### Step 1: Install dependencies
|
||||||
|
|
||||||
|
Install the required dependencies for your app.
|
||||||
|
Create a virtual environment using your preferred method,
|
||||||
|
then install dependencies inside that environment using your
|
||||||
|
preferred tool:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
uv add galileo
|
||||||
|
```
|
||||||
|
|
||||||
|
### Step 2: Add to the .env file from the [CrewAI quickstart](/en/quickstart)
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Your Galileo API key
|
||||||
|
GALILEO_API_KEY="your-galileo-api-key"
|
||||||
|
|
||||||
|
# Your Galileo project name
|
||||||
|
GALILEO_PROJECT="your-galileo-project-name"
|
||||||
|
|
||||||
|
# The name of the Log stream you want to use for logging
|
||||||
|
GALILEO_LOG_STREAM="your-galileo-log-stream "
|
||||||
|
```
|
||||||
|
|
||||||
|
### Step 3: Add the Galileo event listener
|
||||||
|
|
||||||
|
To enable logging with Galileo, you need to create an instance of the `CrewAIEventListener`.
|
||||||
|
Import the Galileo CrewAI handler package by
|
||||||
|
adding the following code at the top of your main.py file:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from galileo.handlers.crewai.handler import CrewAIEventListener
|
||||||
|
```
|
||||||
|
|
||||||
|
At the start of your run function, create the event listener:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def run():
|
||||||
|
# Create the event listener
|
||||||
|
CrewAIEventListener()
|
||||||
|
# The rest of your existing code goes here
|
||||||
|
```
|
||||||
|
|
||||||
|
When you create the listener instance, it is automatically
|
||||||
|
registered with CrewAI.
|
||||||
|
|
||||||
|
### Step 4: Run your crew
|
||||||
|
|
||||||
|
Run your crew with the CrewAI CLI:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
crewai run
|
||||||
|
```
|
||||||
|
|
||||||
|
### Step 5: View the traces in Galileo
|
||||||
|
|
||||||
|
Once your crew has finished, the traces will be flushed and appear in Galileo.
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
## Understanding the Galileo Integration
|
||||||
|
|
||||||
|
Galileo integrates with CrewAI by registering an event listener
|
||||||
|
that captures Crew execution events (e.g., agent actions, tool calls, model responses)
|
||||||
|
and forwards them to Galileo for observability and evaluation.
|
||||||
|
|
||||||
|
### Understanding the event listener
|
||||||
|
|
||||||
|
Creating a `CrewAIEventListener()` instance is all that’s
|
||||||
|
required to enable Galileo for a CrewAI run. When instantiated, the listener:
|
||||||
|
|
||||||
|
- Automatically registers itself with CrewAI
|
||||||
|
- Reads Galileo configuration from environment variables
|
||||||
|
- Logs all run data to the Galileo project and log stream specified by
|
||||||
|
`GALILEO_PROJECT` and `GALILEO_LOG_STREAM`
|
||||||
|
|
||||||
|
No additional configuration or code changes are required.
|
||||||
|
All data from this run is logged to the Galileo project and
|
||||||
|
log stream specified by your environment configuration
|
||||||
|
(for example, GALILEO_PROJECT and GALILEO_LOG_STREAM).
|
||||||
BIN
docs/images/galileo-trace-veiw.png
Normal file
BIN
docs/images/galileo-trace-veiw.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 239 KiB |
@@ -567,6 +567,10 @@ Fourth method running
|
|||||||
|
|
||||||
### Human in the Loop (인간 피드백)
|
### Human in the Loop (인간 피드백)
|
||||||
|
|
||||||
|
<Note>
|
||||||
|
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다.
|
||||||
|
</Note>
|
||||||
|
|
||||||
`@human_feedback` 데코레이터는 인간의 피드백을 수집하기 위해 플로우 실행을 일시 중지하는 human-in-the-loop 워크플로우를 가능하게 합니다. 이는 승인 게이트, 품질 검토, 인간의 판단이 필요한 결정 지점에 유용합니다.
|
`@human_feedback` 데코레이터는 인간의 피드백을 수집하기 위해 플로우 실행을 일시 중지하는 human-in-the-loop 워크플로우를 가능하게 합니다. 이는 승인 게이트, 품질 검토, 인간의 판단이 필요한 결정 지점에 유용합니다.
|
||||||
|
|
||||||
```python Code
|
```python Code
|
||||||
|
|||||||
@@ -7,6 +7,10 @@ mode: "wide"
|
|||||||
|
|
||||||
## 개요
|
## 개요
|
||||||
|
|
||||||
|
<Note>
|
||||||
|
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다. 이 기능을 사용하기 전에 설치를 업데이트하세요.
|
||||||
|
</Note>
|
||||||
|
|
||||||
`@human_feedback` 데코레이터는 CrewAI Flow 내에서 직접 human-in-the-loop(HITL) 워크플로우를 가능하게 합니다. Flow 실행을 일시 중지하고, 인간에게 검토를 위해 출력을 제시하고, 피드백을 수집하고, 선택적으로 피드백 결과에 따라 다른 리스너로 라우팅할 수 있습니다.
|
`@human_feedback` 데코레이터는 CrewAI Flow 내에서 직접 human-in-the-loop(HITL) 워크플로우를 가능하게 합니다. Flow 실행을 일시 중지하고, 인간에게 검토를 위해 출력을 제시하고, 피드백을 수집하고, 선택적으로 피드백 결과에 따라 다른 리스너로 라우팅할 수 있습니다.
|
||||||
|
|
||||||
이는 특히 다음과 같은 경우에 유용합니다:
|
이는 특히 다음과 같은 경우에 유용합니다:
|
||||||
|
|||||||
@@ -5,9 +5,22 @@ icon: "user-check"
|
|||||||
mode: "wide"
|
mode: "wide"
|
||||||
---
|
---
|
||||||
|
|
||||||
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. 이 가이드에서는 CrewAI 내에서 HITL을 구현하는 방법을 안내합니다.
|
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. CrewAI는 필요에 따라 HITL을 구현하는 여러 가지 방법을 제공합니다.
|
||||||
|
|
||||||
## HITL 워크플로우 설정
|
## HITL 접근 방식 선택
|
||||||
|
|
||||||
|
CrewAI는 human-in-the-loop 워크플로우를 구현하기 위한 두 가지 주요 접근 방식을 제공합니다:
|
||||||
|
|
||||||
|
| 접근 방식 | 적합한 용도 | 통합 | 버전 |
|
||||||
|
|----------|----------|-------------|---------|
|
||||||
|
| **Flow 기반** (`@human_feedback` 데코레이터) | 로컬 개발, 콘솔 기반 검토, 동기식 워크플로우 | [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) | **1.8.0+** |
|
||||||
|
| **Webhook 기반** (Enterprise) | 프로덕션 배포, 비동기 워크플로우, 외부 통합 (Slack, Teams 등) | 이 가이드 | - |
|
||||||
|
|
||||||
|
<Tip>
|
||||||
|
Flow를 구축하면서 피드백을 기반으로 라우팅하는 인간 검토 단계를 추가하려면 `@human_feedback` 데코레이터에 대한 [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) 가이드를 참조하세요.
|
||||||
|
</Tip>
|
||||||
|
|
||||||
|
## Webhook 기반 HITL 워크플로우 설정
|
||||||
|
|
||||||
<Steps>
|
<Steps>
|
||||||
<Step title="작업 구성">
|
<Step title="작업 구성">
|
||||||
|
|||||||
115
docs/ko/observability/galileo.mdx
Normal file
115
docs/ko/observability/galileo.mdx
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
---
|
||||||
|
title: Galileo 갈릴레오
|
||||||
|
description: CrewAI 추적 및 평가를 위한 Galileo 통합
|
||||||
|
icon: telescope
|
||||||
|
mode: "wide"
|
||||||
|
---
|
||||||
|
|
||||||
|
## 개요
|
||||||
|
|
||||||
|
이 가이드는 **Galileo**를 **CrewAI**와 통합하는 방법을 보여줍니다.
|
||||||
|
포괄적인 추적 및 평가 엔지니어링을 위한 것입니다.
|
||||||
|
이 가이드가 끝나면 CrewAI 에이전트를 추적할 수 있게 됩니다.
|
||||||
|
성과를 모니터링하고 행동을 평가합니다.
|
||||||
|
Galileo의 강력한 관측 플랫폼.
|
||||||
|
|
||||||
|
> **갈릴레오(Galileo)란 무엇인가요?**[Galileo](https://galileo.ai/)는 AI 평가 및 관찰 가능성입니다.
|
||||||
|
엔드투엔드 추적, 평가,
|
||||||
|
AI 애플리케이션 모니터링. 이를 통해 팀은 실제 사실을 포착할 수 있습니다.
|
||||||
|
견고한 가드레일을 만들고 체계적인 실험을 실행하세요.
|
||||||
|
내장된 실험 추적 및 성능 분석으로 신뢰성 보장
|
||||||
|
AI 수명주기 전반에 걸쳐 투명성과 지속적인 개선을 제공합니다.
|
||||||
|
|
||||||
|
## 시작하기
|
||||||
|
|
||||||
|
이 튜토리얼은 [CrewAI 빠른 시작](/ko/quickstart.mdx)을 따르며 추가하는 방법을 보여줍니다.
|
||||||
|
갈릴레오의 [CrewAIEventListener](https://v2docs.galileo.ai/sdk-api/python/reference/handlers/crewai/handler),
|
||||||
|
이벤트 핸들러.
|
||||||
|
자세한 내용은 갈릴레오 문서를 참고하세요.
|
||||||
|
[CrewAI 애플리케이션에 Galileo 추가](https://v2docs.galileo.ai/how-to-guides/third-party-integrations/add-galileo-to-crewai/add-galileo-to-crewai)
|
||||||
|
방법 안내.
|
||||||
|
|
||||||
|
> **참고**이 튜토리얼에서는 [CrewAI 빠른 시작](/ko/quickstart.mdx)을 완료했다고 가정합니다.
|
||||||
|
완전한 포괄적인 예제를 원한다면 Galileo
|
||||||
|
[CrewAI SDK 예제 저장소](https://github.com/rungalileo/sdk-examples/tree/main/python/agent/crew-ai).
|
||||||
|
|
||||||
|
### 1단계: 종속성 설치
|
||||||
|
|
||||||
|
앱에 필요한 종속성을 설치합니다.
|
||||||
|
원하는 방법으로 가상 환경을 생성하고,
|
||||||
|
그런 다음 다음을 사용하여 해당 환경 내에 종속성을 설치하십시오.
|
||||||
|
선호하는 도구:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
uv add galileo
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2단계: [CrewAI 빠른 시작](/ko/quickstart.mdx)에서 .env 파일에 추가
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Your Galileo API key
|
||||||
|
GALILEO_API_KEY="your-galileo-api-key"
|
||||||
|
|
||||||
|
# Your Galileo project name
|
||||||
|
GALILEO_PROJECT="your-galileo-project-name"
|
||||||
|
|
||||||
|
# The name of the Log stream you want to use for logging
|
||||||
|
GALILEO_LOG_STREAM="your-galileo-log-stream "
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3단계: Galileo 이벤트 리스너 추가
|
||||||
|
|
||||||
|
Galileo로 로깅을 활성화하려면 `CrewAIEventListener`의 인스턴스를 생성해야 합니다.
|
||||||
|
다음을 통해 Galileo CrewAI 핸들러 패키지를 가져옵니다.
|
||||||
|
main.py 파일 상단에 다음 코드를 추가하세요.
|
||||||
|
|
||||||
|
```python
|
||||||
|
from galileo.handlers.crewai.handler import CrewAIEventListener
|
||||||
|
```
|
||||||
|
|
||||||
|
실행 함수 시작 시 이벤트 리스너를 생성합니다.
|
||||||
|
|
||||||
|
```python
|
||||||
|
def run():
|
||||||
|
# Create the event listener
|
||||||
|
CrewAIEventListener()
|
||||||
|
# The rest of your existing code goes here
|
||||||
|
```
|
||||||
|
|
||||||
|
리스너 인스턴스를 생성하면 자동으로
|
||||||
|
CrewAI에 등록되었습니다.
|
||||||
|
|
||||||
|
### 4단계: Crew Agent 실행
|
||||||
|
|
||||||
|
CrewAI CLI를 사용하여 Crew Agent를 실행하세요.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
crewai run
|
||||||
|
```
|
||||||
|
|
||||||
|
### 5단계: Galileo에서 추적 보기
|
||||||
|
|
||||||
|
승무원 에이전트가 완료되면 흔적이 플러시되어 Galileo에 나타납니다.
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
## 갈릴레오 통합 이해
|
||||||
|
|
||||||
|
Galileo는 이벤트 리스너를 등록하여 CrewAI와 통합됩니다.
|
||||||
|
승무원 실행 이벤트(예: 에이전트 작업, 도구 호출, 모델 응답)를 캡처합니다.
|
||||||
|
관찰 가능성과 평가를 위해 이를 갈릴레오에 전달합니다.
|
||||||
|
|
||||||
|
### 이벤트 리스너 이해
|
||||||
|
|
||||||
|
`CrewAIEventListener()` 인스턴스를 생성하는 것이 전부입니다.
|
||||||
|
CrewAI 실행을 위해 Galileo를 활성화하는 데 필요합니다. 인스턴스화되면 리스너는 다음을 수행합니다.
|
||||||
|
|
||||||
|
-CrewAI에 자동으로 등록됩니다.
|
||||||
|
-환경 변수에서 Galileo 구성을 읽습니다.
|
||||||
|
-모든 실행 데이터를 Galileo 프로젝트 및 다음에서 지정한 로그 스트림에 기록합니다.
|
||||||
|
`GALILEO_PROJECT` 및 `GALILEO_LOG_STREAM`
|
||||||
|
|
||||||
|
추가 구성이나 코드 변경이 필요하지 않습니다.
|
||||||
|
이 실행의 모든 데이터는 Galileo 프로젝트에 기록되며
|
||||||
|
환경 구성에 따라 지정된 로그 스트림
|
||||||
|
(예: GALILEO_PROJECT 및 GALILEO_LOG_STREAM)
|
||||||
@@ -309,6 +309,10 @@ Ao executar esse Flow, a saída será diferente dependendo do valor booleano ale
|
|||||||
|
|
||||||
### Human in the Loop (feedback humano)
|
### Human in the Loop (feedback humano)
|
||||||
|
|
||||||
|
<Note>
|
||||||
|
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**.
|
||||||
|
</Note>
|
||||||
|
|
||||||
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop, pausando a execução do flow para coletar feedback de um humano. Isso é útil para portões de aprovação, revisão de qualidade e pontos de decisão que requerem julgamento humano.
|
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop, pausando a execução do flow para coletar feedback de um humano. Isso é útil para portões de aprovação, revisão de qualidade e pontos de decisão que requerem julgamento humano.
|
||||||
|
|
||||||
```python Code
|
```python Code
|
||||||
|
|||||||
@@ -7,6 +7,10 @@ mode: "wide"
|
|||||||
|
|
||||||
## Visão Geral
|
## Visão Geral
|
||||||
|
|
||||||
|
<Note>
|
||||||
|
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**. Certifique-se de atualizar sua instalação antes de usar este recurso.
|
||||||
|
</Note>
|
||||||
|
|
||||||
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop (HITL) diretamente nos CrewAI Flows. Ele permite pausar a execução do flow, apresentar a saída para um humano revisar, coletar seu feedback e, opcionalmente, rotear para diferentes listeners com base no resultado do feedback.
|
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop (HITL) diretamente nos CrewAI Flows. Ele permite pausar a execução do flow, apresentar a saída para um humano revisar, coletar seu feedback e, opcionalmente, rotear para diferentes listeners com base no resultado do feedback.
|
||||||
|
|
||||||
Isso é particularmente valioso para:
|
Isso é particularmente valioso para:
|
||||||
|
|||||||
@@ -5,9 +5,22 @@ icon: "user-check"
|
|||||||
mode: "wide"
|
mode: "wide"
|
||||||
---
|
---
|
||||||
|
|
||||||
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. Este guia mostra como implementar HITL dentro da CrewAI.
|
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. CrewAI oferece várias maneiras de implementar HITL dependendo das suas necessidades.
|
||||||
|
|
||||||
## Configurando Workflows HITL
|
## Escolhendo Sua Abordagem HITL
|
||||||
|
|
||||||
|
CrewAI oferece duas abordagens principais para implementar workflows human-in-the-loop:
|
||||||
|
|
||||||
|
| Abordagem | Melhor Para | Integração | Versão |
|
||||||
|
|----------|----------|-------------|---------|
|
||||||
|
| **Baseada em Flow** (decorador `@human_feedback`) | Desenvolvimento local, revisão via console, workflows síncronos | [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) | **1.8.0+** |
|
||||||
|
| **Baseada em Webhook** (Enterprise) | Deployments em produção, workflows assíncronos, integrações externas (Slack, Teams, etc.) | Este guia | - |
|
||||||
|
|
||||||
|
<Tip>
|
||||||
|
Se você está construindo flows e deseja adicionar etapas de revisão humana com roteamento baseado em feedback, confira o guia [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) para o decorador `@human_feedback`.
|
||||||
|
</Tip>
|
||||||
|
|
||||||
|
## Configurando Workflows HITL Baseados em Webhook
|
||||||
|
|
||||||
<Steps>
|
<Steps>
|
||||||
<Step title="Configure sua Tarefa">
|
<Step title="Configure sua Tarefa">
|
||||||
|
|||||||
115
docs/pt-BR/observability/galileo.mdx
Normal file
115
docs/pt-BR/observability/galileo.mdx
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
---
|
||||||
|
title: Galileo Galileu
|
||||||
|
description: Integração Galileo para rastreamento e avaliação CrewAI
|
||||||
|
icon: telescope
|
||||||
|
mode: "wide"
|
||||||
|
---
|
||||||
|
|
||||||
|
## Visão geral
|
||||||
|
|
||||||
|
Este guia demonstra como integrar o **Galileo**com o **CrewAI**
|
||||||
|
para rastreamento abrangente e engenharia de avaliação.
|
||||||
|
Ao final deste guia, você será capaz de rastrear seus agentes CrewAI,
|
||||||
|
monitorar seu desempenho e avaliar seu comportamento com
|
||||||
|
A poderosa plataforma de observabilidade do Galileo.
|
||||||
|
|
||||||
|
> **O que é Galileo?**[Galileo](https://galileo.ai/) é avaliação e observabilidade de IA
|
||||||
|
plataforma que oferece rastreamento, avaliação e
|
||||||
|
e monitoramento de aplicações de IA. Ele permite que as equipes capturem a verdade,
|
||||||
|
criar grades de proteção robustas e realizar experimentos sistemáticos com
|
||||||
|
rastreamento de experimentos integrado e análise de desempenho -garantindo confiabilidade,
|
||||||
|
transparência e melhoria contínua em todo o ciclo de vida da IA.
|
||||||
|
|
||||||
|
## Primeiros passos
|
||||||
|
|
||||||
|
Este tutorial segue o [CrewAI Quickstart](pt-BR/quickstart) e mostra como adicionar
|
||||||
|
[CrewAIEventListener] do Galileo(https://v2docs.galileo.ai/sdk-api/python/reference/handlers/crewai/handler),
|
||||||
|
um manipulador de eventos.
|
||||||
|
Para mais informações, consulte Galileu
|
||||||
|
[Adicionar Galileo a um aplicativo CrewAI](https://v2docs.galileo.ai/how-to-guides/third-party-integrations/add-galileo-to-crewai/add-galileo-to-crewai)
|
||||||
|
guia prático.
|
||||||
|
|
||||||
|
> **Observação**Este tutorial pressupõe que você concluiu o [CrewAI Quickstart](pt-BR/quickstart).
|
||||||
|
Se você quiser um exemplo completo e abrangente, consulte o Galileo
|
||||||
|
[Repositório de exemplo SDK da CrewAI](https://github.com/rungalileo/sdk-examples/tree/main/python/agent/crew-ai).
|
||||||
|
|
||||||
|
### Etapa 1: instalar dependências
|
||||||
|
|
||||||
|
Instale as dependências necessárias para seu aplicativo.
|
||||||
|
Crie um ambiente virtual usando seu método preferido,
|
||||||
|
em seguida, instale dependências dentro desse ambiente usando seu
|
||||||
|
ferramenta preferida:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
uv add galileo
|
||||||
|
```
|
||||||
|
|
||||||
|
### Etapa 2: adicione ao arquivo .env do [CrewAI Quickstart](/pt-BR/quickstart)
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Your Galileo API key
|
||||||
|
GALILEO_API_KEY="your-galileo-api-key"
|
||||||
|
|
||||||
|
# Your Galileo project name
|
||||||
|
GALILEO_PROJECT="your-galileo-project-name"
|
||||||
|
|
||||||
|
# The name of the Log stream you want to use for logging
|
||||||
|
GALILEO_LOG_STREAM="your-galileo-log-stream "
|
||||||
|
```
|
||||||
|
|
||||||
|
### Etapa 3: adicionar o ouvinte de eventos Galileo
|
||||||
|
|
||||||
|
Para habilitar o registro com Galileo, você precisa criar uma instância do `CrewAIEventListener`.
|
||||||
|
Importe o pacote manipulador Galileo CrewAI por
|
||||||
|
adicionando o seguinte código no topo do seu arquivo main.py:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from galileo.handlers.crewai.handler import CrewAIEventListener
|
||||||
|
```
|
||||||
|
|
||||||
|
No início da sua função run, crie o ouvinte de evento:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def run():
|
||||||
|
# Create the event listener
|
||||||
|
CrewAIEventListener()
|
||||||
|
# The rest of your existing code goes here
|
||||||
|
```
|
||||||
|
|
||||||
|
Quando você cria a instância do listener, ela é automaticamente
|
||||||
|
registrado na CrewAI.
|
||||||
|
|
||||||
|
### Etapa 4: administre sua Crew
|
||||||
|
|
||||||
|
Administre sua Crew com o CrewAI CLI:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
crewai run
|
||||||
|
```
|
||||||
|
|
||||||
|
### Passo 5: Visualize os traços no Galileo
|
||||||
|
|
||||||
|
Assim que sua tripulação terminar, os rastros serão eliminados e aparecerão no Galileo.
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
## Compreendendo a integração do Galileo
|
||||||
|
|
||||||
|
Galileo se integra ao CrewAI registrando um ouvinte de evento
|
||||||
|
que captura eventos de execução da tripulação (por exemplo, ações do agente, chamadas de ferramentas, respostas do modelo)
|
||||||
|
e os encaminha ao Galileo para observabilidade e avaliação.
|
||||||
|
|
||||||
|
### Compreendendo o ouvinte de eventos
|
||||||
|
|
||||||
|
Criar uma instância `CrewAIEventListener()` é tudo o que você precisa
|
||||||
|
necessário para habilitar o Galileo para uma execução do CrewAI. Quando instanciado, o ouvinte:
|
||||||
|
|
||||||
|
-Registra-se automaticamente no CrewAI
|
||||||
|
-Lê a configuração do Galileo a partir de variáveis de ambiente
|
||||||
|
-Registra todos os dados de execução no projeto Galileo e fluxo de log especificado por
|
||||||
|
`GALILEO_PROJECT` e `GALILEO_LOG_STREAM`
|
||||||
|
|
||||||
|
Nenhuma configuração adicional ou alterações de código são necessárias.
|
||||||
|
Todos os dados desta execução são registados no projecto Galileo e
|
||||||
|
fluxo de log especificado pela configuração do seu ambiente
|
||||||
|
(por exemplo, GALILEO_PROJECT e GALILEO_LOG_STREAM).
|
||||||
@@ -12,7 +12,7 @@ dependencies = [
|
|||||||
"pytube~=15.0.0",
|
"pytube~=15.0.0",
|
||||||
"requests~=2.32.5",
|
"requests~=2.32.5",
|
||||||
"docker~=7.1.0",
|
"docker~=7.1.0",
|
||||||
"crewai==1.8.0",
|
"crewai==1.8.1",
|
||||||
"lancedb~=0.5.4",
|
"lancedb~=0.5.4",
|
||||||
"tiktoken~=0.8.0",
|
"tiktoken~=0.8.0",
|
||||||
"beautifulsoup4~=4.13.4",
|
"beautifulsoup4~=4.13.4",
|
||||||
|
|||||||
@@ -291,4 +291,4 @@ __all__ = [
|
|||||||
"ZapierActionTools",
|
"ZapierActionTools",
|
||||||
]
|
]
|
||||||
|
|
||||||
__version__ = "1.8.0"
|
__version__ = "1.8.1"
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
|
|||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
tools = [
|
tools = [
|
||||||
"crewai-tools==1.8.0",
|
"crewai-tools==1.8.1",
|
||||||
]
|
]
|
||||||
embeddings = [
|
embeddings = [
|
||||||
"tiktoken~=0.8.0"
|
"tiktoken~=0.8.0"
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
|
|||||||
|
|
||||||
_suppress_pydantic_deprecation_warnings()
|
_suppress_pydantic_deprecation_warnings()
|
||||||
|
|
||||||
__version__ = "1.8.0"
|
__version__ = "1.8.1"
|
||||||
_telemetry_submitted = False
|
_telemetry_submitted = False
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
"""Agent-to-Agent (A2A) protocol communication module for CrewAI."""
|
"""Agent-to-Agent (A2A) protocol communication module for CrewAI."""
|
||||||
|
|
||||||
from crewai.a2a.config import A2AConfig
|
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
"A2AClientConfig",
|
||||||
"A2AConfig",
|
"A2AConfig",
|
||||||
|
"A2AServerConfig",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -5,45 +5,57 @@ This module is separate from experimental.a2a to avoid circular imports.
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Annotated, Any, ClassVar
|
from importlib.metadata import version
|
||||||
|
from typing import Any, ClassVar, Literal
|
||||||
|
|
||||||
from pydantic import (
|
from pydantic import BaseModel, ConfigDict, Field
|
||||||
BaseModel,
|
from typing_extensions import deprecated
|
||||||
BeforeValidator,
|
|
||||||
ConfigDict,
|
|
||||||
Field,
|
|
||||||
HttpUrl,
|
|
||||||
TypeAdapter,
|
|
||||||
)
|
|
||||||
|
|
||||||
from crewai.a2a.auth.schemas import AuthScheme
|
from crewai.a2a.auth.schemas import AuthScheme
|
||||||
|
from crewai.a2a.types import TransportType, Url
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
from a2a.types import (
|
||||||
|
AgentCapabilities,
|
||||||
|
AgentCardSignature,
|
||||||
|
AgentInterface,
|
||||||
|
AgentProvider,
|
||||||
|
AgentSkill,
|
||||||
|
SecurityScheme,
|
||||||
|
)
|
||||||
|
|
||||||
from crewai.a2a.updates import UpdateConfig
|
from crewai.a2a.updates import UpdateConfig
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
UpdateConfig = Any
|
||||||
|
AgentCapabilities = Any
|
||||||
|
AgentCardSignature = Any
|
||||||
|
AgentInterface = Any
|
||||||
|
AgentProvider = Any
|
||||||
|
SecurityScheme = Any
|
||||||
|
AgentSkill = Any
|
||||||
UpdateConfig = Any # type: ignore[misc,assignment]
|
UpdateConfig = Any # type: ignore[misc,assignment]
|
||||||
|
|
||||||
|
|
||||||
http_url_adapter = TypeAdapter(HttpUrl)
|
|
||||||
|
|
||||||
Url = Annotated[
|
|
||||||
str,
|
|
||||||
BeforeValidator(
|
|
||||||
lambda value: str(http_url_adapter.validate_python(value, strict=True))
|
|
||||||
),
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def _get_default_update_config() -> UpdateConfig:
|
def _get_default_update_config() -> UpdateConfig:
|
||||||
from crewai.a2a.updates import StreamingConfig
|
from crewai.a2a.updates import StreamingConfig
|
||||||
|
|
||||||
return StreamingConfig()
|
return StreamingConfig()
|
||||||
|
|
||||||
|
|
||||||
|
@deprecated(
|
||||||
|
"""
|
||||||
|
`crewai.a2a.config.A2AConfig` is deprecated and will be removed in v2.0.0,
|
||||||
|
use `crewai.a2a.config.A2AClientConfig` or `crewai.a2a.config.A2AServerConfig` instead.
|
||||||
|
""",
|
||||||
|
category=FutureWarning,
|
||||||
|
)
|
||||||
class A2AConfig(BaseModel):
|
class A2AConfig(BaseModel):
|
||||||
"""Configuration for A2A protocol integration.
|
"""Configuration for A2A protocol integration.
|
||||||
|
|
||||||
|
Deprecated:
|
||||||
|
Use A2AClientConfig instead. This class will be removed in a future version.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
endpoint: A2A agent endpoint URL.
|
endpoint: A2A agent endpoint URL.
|
||||||
auth: Authentication scheme.
|
auth: Authentication scheme.
|
||||||
@@ -53,6 +65,7 @@ class A2AConfig(BaseModel):
|
|||||||
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
|
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
|
||||||
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
|
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
|
||||||
updates: Update mechanism config.
|
updates: Update mechanism config.
|
||||||
|
transport_protocol: A2A transport protocol (grpc, jsonrpc, http+json).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
|
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
|
||||||
@@ -82,3 +95,180 @@ class A2AConfig(BaseModel):
|
|||||||
default_factory=_get_default_update_config,
|
default_factory=_get_default_update_config,
|
||||||
description="Update mechanism config",
|
description="Update mechanism config",
|
||||||
)
|
)
|
||||||
|
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
|
||||||
|
default="JSONRPC",
|
||||||
|
description="Specified mode of A2A transport protocol",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class A2AClientConfig(BaseModel):
|
||||||
|
"""Configuration for connecting to remote A2A agents.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
endpoint: A2A agent endpoint URL.
|
||||||
|
auth: Authentication scheme.
|
||||||
|
timeout: Request timeout in seconds.
|
||||||
|
max_turns: Maximum conversation turns with A2A agent.
|
||||||
|
response_model: Optional Pydantic model for structured A2A agent responses.
|
||||||
|
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
|
||||||
|
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
|
||||||
|
updates: Update mechanism config.
|
||||||
|
accepted_output_modes: Media types the client can accept in responses.
|
||||||
|
supported_transports: Ordered list of transport protocols the client supports.
|
||||||
|
use_client_preference: Whether to prioritize client transport preferences over server.
|
||||||
|
extensions: Extension URIs the client supports.
|
||||||
|
"""
|
||||||
|
|
||||||
|
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
|
||||||
|
|
||||||
|
endpoint: Url = Field(description="A2A agent endpoint URL")
|
||||||
|
auth: AuthScheme | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="Authentication scheme",
|
||||||
|
)
|
||||||
|
timeout: int = Field(default=120, description="Request timeout in seconds")
|
||||||
|
max_turns: int = Field(
|
||||||
|
default=10, description="Maximum conversation turns with A2A agent"
|
||||||
|
)
|
||||||
|
response_model: type[BaseModel] | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="Optional Pydantic model for structured A2A agent responses",
|
||||||
|
)
|
||||||
|
fail_fast: bool = Field(
|
||||||
|
default=True,
|
||||||
|
description="If True, raise error when agent unreachable; if False, skip",
|
||||||
|
)
|
||||||
|
trust_remote_completion_status: bool = Field(
|
||||||
|
default=False,
|
||||||
|
description="If True, return A2A result directly when completed",
|
||||||
|
)
|
||||||
|
updates: UpdateConfig = Field(
|
||||||
|
default_factory=_get_default_update_config,
|
||||||
|
description="Update mechanism config",
|
||||||
|
)
|
||||||
|
accepted_output_modes: list[str] = Field(
|
||||||
|
default_factory=lambda: ["application/json"],
|
||||||
|
description="Media types the client can accept in responses",
|
||||||
|
)
|
||||||
|
supported_transports: list[str] = Field(
|
||||||
|
default_factory=lambda: ["JSONRPC"],
|
||||||
|
description="Ordered list of transport protocols the client supports",
|
||||||
|
)
|
||||||
|
use_client_preference: bool = Field(
|
||||||
|
default=False,
|
||||||
|
description="Whether to prioritize client transport preferences over server",
|
||||||
|
)
|
||||||
|
extensions: list[str] = Field(
|
||||||
|
default_factory=list,
|
||||||
|
description="Extension URIs the client supports",
|
||||||
|
)
|
||||||
|
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
|
||||||
|
default="JSONRPC",
|
||||||
|
description="Specified mode of A2A transport protocol",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class A2AServerConfig(BaseModel):
|
||||||
|
"""Configuration for exposing a Crew or Agent as an A2A server.
|
||||||
|
|
||||||
|
All fields correspond to A2A AgentCard fields. Fields like name, description,
|
||||||
|
and skills can be auto-derived from the Crew/Agent if not provided.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
name: Human-readable name for the agent.
|
||||||
|
description: Human-readable description of the agent.
|
||||||
|
version: Version string for the agent card.
|
||||||
|
skills: List of agent skills/capabilities.
|
||||||
|
default_input_modes: Default supported input MIME types.
|
||||||
|
default_output_modes: Default supported output MIME types.
|
||||||
|
capabilities: Declaration of optional capabilities.
|
||||||
|
preferred_transport: Transport protocol for the preferred endpoint.
|
||||||
|
protocol_version: A2A protocol version this agent supports.
|
||||||
|
provider: Information about the agent's service provider.
|
||||||
|
documentation_url: URL to the agent's documentation.
|
||||||
|
icon_url: URL to an icon for the agent.
|
||||||
|
additional_interfaces: Additional supported interfaces.
|
||||||
|
security: Security requirement objects for all interactions.
|
||||||
|
security_schemes: Security schemes available to authorize requests.
|
||||||
|
supports_authenticated_extended_card: Whether agent provides extended card to authenticated users.
|
||||||
|
url: Preferred endpoint URL for the agent.
|
||||||
|
signatures: JSON Web Signatures for the AgentCard.
|
||||||
|
"""
|
||||||
|
|
||||||
|
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
|
||||||
|
|
||||||
|
name: str | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="Human-readable name for the agent. Auto-derived from Crew/Agent if not provided.",
|
||||||
|
)
|
||||||
|
description: str | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="Human-readable description of the agent. Auto-derived from Crew/Agent if not provided.",
|
||||||
|
)
|
||||||
|
version: str = Field(
|
||||||
|
default="1.0.0",
|
||||||
|
description="Version string for the agent card",
|
||||||
|
)
|
||||||
|
skills: list[AgentSkill] = Field(
|
||||||
|
default_factory=list,
|
||||||
|
description="List of agent skills. Auto-derived from tasks/tools if not provided.",
|
||||||
|
)
|
||||||
|
default_input_modes: list[str] = Field(
|
||||||
|
default_factory=lambda: ["text/plain", "application/json"],
|
||||||
|
description="Default supported input MIME types",
|
||||||
|
)
|
||||||
|
default_output_modes: list[str] = Field(
|
||||||
|
default_factory=lambda: ["text/plain", "application/json"],
|
||||||
|
description="Default supported output MIME types",
|
||||||
|
)
|
||||||
|
capabilities: AgentCapabilities = Field(
|
||||||
|
default_factory=lambda: AgentCapabilities(
|
||||||
|
streaming=True,
|
||||||
|
push_notifications=False,
|
||||||
|
),
|
||||||
|
description="Declaration of optional capabilities supported by the agent",
|
||||||
|
)
|
||||||
|
preferred_transport: TransportType = Field(
|
||||||
|
default="JSONRPC",
|
||||||
|
description="Transport protocol for the preferred endpoint",
|
||||||
|
)
|
||||||
|
protocol_version: str = Field(
|
||||||
|
default_factory=lambda: version("a2a-sdk"),
|
||||||
|
description="A2A protocol version this agent supports",
|
||||||
|
)
|
||||||
|
provider: AgentProvider | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="Information about the agent's service provider",
|
||||||
|
)
|
||||||
|
documentation_url: Url | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="URL to the agent's documentation",
|
||||||
|
)
|
||||||
|
icon_url: Url | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="URL to an icon for the agent",
|
||||||
|
)
|
||||||
|
additional_interfaces: list[AgentInterface] = Field(
|
||||||
|
default_factory=list,
|
||||||
|
description="Additional supported interfaces (transport and URL combinations)",
|
||||||
|
)
|
||||||
|
security: list[dict[str, list[str]]] = Field(
|
||||||
|
default_factory=list,
|
||||||
|
description="Security requirement objects for all agent interactions",
|
||||||
|
)
|
||||||
|
security_schemes: dict[str, SecurityScheme] = Field(
|
||||||
|
default_factory=dict,
|
||||||
|
description="Security schemes available to authorize requests",
|
||||||
|
)
|
||||||
|
supports_authenticated_extended_card: bool = Field(
|
||||||
|
default=False,
|
||||||
|
description="Whether agent provides extended card to authenticated users",
|
||||||
|
)
|
||||||
|
url: Url | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="Preferred endpoint URL for the agent. Set at runtime if not provided.",
|
||||||
|
)
|
||||||
|
signatures: list[AgentCardSignature] = Field(
|
||||||
|
default_factory=list,
|
||||||
|
description="JSON Web Signatures for the AgentCard",
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,7 +1,17 @@
|
|||||||
"""Type definitions for A2A protocol message parts."""
|
"""Type definitions for A2A protocol message parts."""
|
||||||
|
|
||||||
from typing import Any, Literal, Protocol, TypedDict, runtime_checkable
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import (
|
||||||
|
Annotated,
|
||||||
|
Any,
|
||||||
|
Literal,
|
||||||
|
Protocol,
|
||||||
|
TypedDict,
|
||||||
|
runtime_checkable,
|
||||||
|
)
|
||||||
|
|
||||||
|
from pydantic import BeforeValidator, HttpUrl, TypeAdapter
|
||||||
from typing_extensions import NotRequired
|
from typing_extensions import NotRequired
|
||||||
|
|
||||||
from crewai.a2a.updates import (
|
from crewai.a2a.updates import (
|
||||||
@@ -15,6 +25,18 @@ from crewai.a2a.updates import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
TransportType = Literal["JSONRPC", "GRPC", "HTTP+JSON"]
|
||||||
|
|
||||||
|
http_url_adapter: TypeAdapter[HttpUrl] = TypeAdapter(HttpUrl)
|
||||||
|
|
||||||
|
Url = Annotated[
|
||||||
|
str,
|
||||||
|
BeforeValidator(
|
||||||
|
lambda value: str(http_url_adapter.validate_python(value, strict=True))
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class AgentResponseProtocol(Protocol):
|
class AgentResponseProtocol(Protocol):
|
||||||
"""Protocol for the dynamically created AgentResponse model."""
|
"""Protocol for the dynamically created AgentResponse model."""
|
||||||
|
|||||||
1
lib/crewai/src/crewai/a2a/utils/__init__.py
Normal file
1
lib/crewai/src/crewai/a2a/utils/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
"""A2A utility modules for client operations."""
|
||||||
399
lib/crewai/src/crewai/a2a/utils/agent_card.py
Normal file
399
lib/crewai/src/crewai/a2a/utils/agent_card.py
Normal file
@@ -0,0 +1,399 @@
|
|||||||
|
"""AgentCard utilities for A2A client and server operations."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from collections.abc import MutableMapping
|
||||||
|
from functools import lru_cache
|
||||||
|
import time
|
||||||
|
from types import MethodType
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from a2a.client.errors import A2AClientHTTPError
|
||||||
|
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
|
||||||
|
from aiocache import cached # type: ignore[import-untyped]
|
||||||
|
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth
|
||||||
|
from crewai.a2a.auth.utils import (
|
||||||
|
_auth_store,
|
||||||
|
configure_auth_client,
|
||||||
|
retry_on_401,
|
||||||
|
)
|
||||||
|
from crewai.a2a.config import A2AServerConfig
|
||||||
|
from crewai.crew import Crew
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from crewai.a2a.auth.schemas import AuthScheme
|
||||||
|
from crewai.agent import Agent
|
||||||
|
from crewai.task import Task
|
||||||
|
|
||||||
|
|
||||||
|
def _get_server_config(agent: Agent) -> A2AServerConfig | None:
|
||||||
|
"""Get A2AServerConfig from an agent's a2a configuration.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The Agent instance to check.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A2AServerConfig if present, None otherwise.
|
||||||
|
"""
|
||||||
|
if agent.a2a is None:
|
||||||
|
return None
|
||||||
|
if isinstance(agent.a2a, A2AServerConfig):
|
||||||
|
return agent.a2a
|
||||||
|
if isinstance(agent.a2a, list):
|
||||||
|
for config in agent.a2a:
|
||||||
|
if isinstance(config, A2AServerConfig):
|
||||||
|
return config
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_agent_card(
|
||||||
|
endpoint: str,
|
||||||
|
auth: AuthScheme | None = None,
|
||||||
|
timeout: int = 30,
|
||||||
|
use_cache: bool = True,
|
||||||
|
cache_ttl: int = 300,
|
||||||
|
) -> AgentCard:
|
||||||
|
"""Fetch AgentCard from an A2A endpoint with optional caching.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
endpoint: A2A agent endpoint URL (AgentCard URL).
|
||||||
|
auth: Optional AuthScheme for authentication.
|
||||||
|
timeout: Request timeout in seconds.
|
||||||
|
use_cache: Whether to use caching (default True).
|
||||||
|
cache_ttl: Cache TTL in seconds (default 300 = 5 minutes).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
AgentCard object with agent capabilities and skills.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
httpx.HTTPStatusError: If the request fails.
|
||||||
|
A2AClientHTTPError: If authentication fails.
|
||||||
|
"""
|
||||||
|
if use_cache:
|
||||||
|
if auth:
|
||||||
|
auth_data = auth.model_dump_json(
|
||||||
|
exclude={
|
||||||
|
"_access_token",
|
||||||
|
"_token_expires_at",
|
||||||
|
"_refresh_token",
|
||||||
|
"_authorization_callback",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
auth_hash = hash((type(auth).__name__, auth_data))
|
||||||
|
else:
|
||||||
|
auth_hash = 0
|
||||||
|
_auth_store[auth_hash] = auth
|
||||||
|
ttl_hash = int(time.time() // cache_ttl)
|
||||||
|
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
|
||||||
|
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
try:
|
||||||
|
return loop.run_until_complete(
|
||||||
|
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def afetch_agent_card(
|
||||||
|
endpoint: str,
|
||||||
|
auth: AuthScheme | None = None,
|
||||||
|
timeout: int = 30,
|
||||||
|
use_cache: bool = True,
|
||||||
|
) -> AgentCard:
|
||||||
|
"""Fetch AgentCard from an A2A endpoint asynchronously.
|
||||||
|
|
||||||
|
Native async implementation. Use this when running in an async context.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
endpoint: A2A agent endpoint URL (AgentCard URL).
|
||||||
|
auth: Optional AuthScheme for authentication.
|
||||||
|
timeout: Request timeout in seconds.
|
||||||
|
use_cache: Whether to use caching (default True).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
AgentCard object with agent capabilities and skills.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
httpx.HTTPStatusError: If the request fails.
|
||||||
|
A2AClientHTTPError: If authentication fails.
|
||||||
|
"""
|
||||||
|
if use_cache:
|
||||||
|
if auth:
|
||||||
|
auth_data = auth.model_dump_json(
|
||||||
|
exclude={
|
||||||
|
"_access_token",
|
||||||
|
"_token_expires_at",
|
||||||
|
"_refresh_token",
|
||||||
|
"_authorization_callback",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
auth_hash = hash((type(auth).__name__, auth_data))
|
||||||
|
else:
|
||||||
|
auth_hash = 0
|
||||||
|
_auth_store[auth_hash] = auth
|
||||||
|
agent_card: AgentCard = await _afetch_agent_card_cached(
|
||||||
|
endpoint, auth_hash, timeout
|
||||||
|
)
|
||||||
|
return agent_card
|
||||||
|
|
||||||
|
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache()
|
||||||
|
def _fetch_agent_card_cached(
|
||||||
|
endpoint: str,
|
||||||
|
auth_hash: int,
|
||||||
|
timeout: int,
|
||||||
|
_ttl_hash: int,
|
||||||
|
) -> AgentCard:
|
||||||
|
"""Cached sync version of fetch_agent_card."""
|
||||||
|
auth = _auth_store.get(auth_hash)
|
||||||
|
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
try:
|
||||||
|
return loop.run_until_complete(
|
||||||
|
_afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
|
||||||
|
@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator]
|
||||||
|
async def _afetch_agent_card_cached(
|
||||||
|
endpoint: str,
|
||||||
|
auth_hash: int,
|
||||||
|
timeout: int,
|
||||||
|
) -> AgentCard:
|
||||||
|
"""Cached async implementation of AgentCard fetching."""
|
||||||
|
auth = _auth_store.get(auth_hash)
|
||||||
|
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||||
|
|
||||||
|
|
||||||
|
async def _afetch_agent_card_impl(
|
||||||
|
endpoint: str,
|
||||||
|
auth: AuthScheme | None,
|
||||||
|
timeout: int,
|
||||||
|
) -> AgentCard:
|
||||||
|
"""Internal async implementation of AgentCard fetching."""
|
||||||
|
if "/.well-known/agent-card.json" in endpoint:
|
||||||
|
base_url = endpoint.replace("/.well-known/agent-card.json", "")
|
||||||
|
agent_card_path = "/.well-known/agent-card.json"
|
||||||
|
else:
|
||||||
|
url_parts = endpoint.split("/", 3)
|
||||||
|
base_url = f"{url_parts[0]}//{url_parts[2]}"
|
||||||
|
agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/"
|
||||||
|
|
||||||
|
headers: MutableMapping[str, str] = {}
|
||||||
|
if auth:
|
||||||
|
async with httpx.AsyncClient(timeout=timeout) as temp_auth_client:
|
||||||
|
if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
|
||||||
|
configure_auth_client(auth, temp_auth_client)
|
||||||
|
headers = await auth.apply_auth(temp_auth_client, {})
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client:
|
||||||
|
if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
|
||||||
|
configure_auth_client(auth, temp_client)
|
||||||
|
|
||||||
|
agent_card_url = f"{base_url}{agent_card_path}"
|
||||||
|
|
||||||
|
async def _fetch_agent_card_request() -> httpx.Response:
|
||||||
|
return await temp_client.get(agent_card_url)
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = await retry_on_401(
|
||||||
|
request_func=_fetch_agent_card_request,
|
||||||
|
auth_scheme=auth,
|
||||||
|
client=temp_client,
|
||||||
|
headers=temp_client.headers,
|
||||||
|
max_retries=2,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
return AgentCard.model_validate(response.json())
|
||||||
|
|
||||||
|
except httpx.HTTPStatusError as e:
|
||||||
|
if e.response.status_code == 401:
|
||||||
|
error_details = ["Authentication failed"]
|
||||||
|
www_auth = e.response.headers.get("WWW-Authenticate")
|
||||||
|
if www_auth:
|
||||||
|
error_details.append(f"WWW-Authenticate: {www_auth}")
|
||||||
|
if not auth:
|
||||||
|
error_details.append("No auth scheme provided")
|
||||||
|
msg = " | ".join(error_details)
|
||||||
|
raise A2AClientHTTPError(401, msg) from e
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def _task_to_skill(task: Task) -> AgentSkill:
|
||||||
|
"""Convert a CrewAI Task to an A2A AgentSkill.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task: The CrewAI Task to convert.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
AgentSkill representing the task's capability.
|
||||||
|
"""
|
||||||
|
task_name = task.name or task.description[:50]
|
||||||
|
task_id = task_name.lower().replace(" ", "_")
|
||||||
|
|
||||||
|
tags: list[str] = []
|
||||||
|
if task.agent:
|
||||||
|
tags.append(task.agent.role.lower().replace(" ", "-"))
|
||||||
|
|
||||||
|
return AgentSkill(
|
||||||
|
id=task_id,
|
||||||
|
name=task_name,
|
||||||
|
description=task.description,
|
||||||
|
tags=tags,
|
||||||
|
examples=[task.expected_output] if task.expected_output else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _tool_to_skill(tool_name: str, tool_description: str) -> AgentSkill:
|
||||||
|
"""Convert an Agent's tool to an A2A AgentSkill.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tool_name: Name of the tool.
|
||||||
|
tool_description: Description of what the tool does.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
AgentSkill representing the tool's capability.
|
||||||
|
"""
|
||||||
|
tool_id = tool_name.lower().replace(" ", "_")
|
||||||
|
|
||||||
|
return AgentSkill(
|
||||||
|
id=tool_id,
|
||||||
|
name=tool_name,
|
||||||
|
description=tool_description,
|
||||||
|
tags=[tool_name.lower().replace(" ", "-")],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _crew_to_agent_card(crew: Crew, url: str) -> AgentCard:
|
||||||
|
"""Generate an A2A AgentCard from a Crew instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
crew: The Crew instance to generate a card for.
|
||||||
|
url: The base URL where this crew will be exposed.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
AgentCard describing the crew's capabilities.
|
||||||
|
"""
|
||||||
|
crew_name = getattr(crew, "name", None) or crew.__class__.__name__
|
||||||
|
|
||||||
|
description_parts: list[str] = []
|
||||||
|
crew_description = getattr(crew, "description", None)
|
||||||
|
if crew_description:
|
||||||
|
description_parts.append(crew_description)
|
||||||
|
else:
|
||||||
|
agent_roles = [agent.role for agent in crew.agents]
|
||||||
|
description_parts.append(
|
||||||
|
f"A crew of {len(crew.agents)} agents: {', '.join(agent_roles)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
skills = [_task_to_skill(task) for task in crew.tasks]
|
||||||
|
|
||||||
|
return AgentCard(
|
||||||
|
name=crew_name,
|
||||||
|
description=" ".join(description_parts),
|
||||||
|
url=url,
|
||||||
|
version="1.0.0",
|
||||||
|
capabilities=AgentCapabilities(
|
||||||
|
streaming=True,
|
||||||
|
push_notifications=True,
|
||||||
|
),
|
||||||
|
default_input_modes=["text/plain", "application/json"],
|
||||||
|
default_output_modes=["text/plain", "application/json"],
|
||||||
|
skills=skills,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _agent_to_agent_card(agent: Agent, url: str) -> AgentCard:
|
||||||
|
"""Generate an A2A AgentCard from an Agent instance.
|
||||||
|
|
||||||
|
Uses A2AServerConfig values when available, falling back to agent properties.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The Agent instance to generate a card for.
|
||||||
|
url: The base URL where this agent will be exposed.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
AgentCard describing the agent's capabilities.
|
||||||
|
"""
|
||||||
|
server_config = _get_server_config(agent) or A2AServerConfig()
|
||||||
|
|
||||||
|
name = server_config.name or agent.role
|
||||||
|
|
||||||
|
description_parts = [agent.goal]
|
||||||
|
if agent.backstory:
|
||||||
|
description_parts.append(agent.backstory)
|
||||||
|
description = server_config.description or " ".join(description_parts)
|
||||||
|
|
||||||
|
skills: list[AgentSkill] = (
|
||||||
|
server_config.skills.copy() if server_config.skills else []
|
||||||
|
)
|
||||||
|
|
||||||
|
if not skills:
|
||||||
|
if agent.tools:
|
||||||
|
for tool in agent.tools:
|
||||||
|
tool_name = getattr(tool, "name", None) or tool.__class__.__name__
|
||||||
|
tool_desc = getattr(tool, "description", None) or f"Tool: {tool_name}"
|
||||||
|
skills.append(_tool_to_skill(tool_name, tool_desc))
|
||||||
|
|
||||||
|
if not skills:
|
||||||
|
skills.append(
|
||||||
|
AgentSkill(
|
||||||
|
id=agent.role.lower().replace(" ", "_"),
|
||||||
|
name=agent.role,
|
||||||
|
description=agent.goal,
|
||||||
|
tags=[agent.role.lower().replace(" ", "-")],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return AgentCard(
|
||||||
|
name=name,
|
||||||
|
description=description,
|
||||||
|
url=server_config.url or url,
|
||||||
|
version=server_config.version,
|
||||||
|
capabilities=server_config.capabilities,
|
||||||
|
default_input_modes=server_config.default_input_modes,
|
||||||
|
default_output_modes=server_config.default_output_modes,
|
||||||
|
skills=skills,
|
||||||
|
protocol_version=server_config.protocol_version,
|
||||||
|
provider=server_config.provider,
|
||||||
|
documentation_url=server_config.documentation_url,
|
||||||
|
icon_url=server_config.icon_url,
|
||||||
|
additional_interfaces=server_config.additional_interfaces,
|
||||||
|
security=server_config.security,
|
||||||
|
security_schemes=server_config.security_schemes,
|
||||||
|
supports_authenticated_extended_card=server_config.supports_authenticated_extended_card,
|
||||||
|
signatures=server_config.signatures,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def inject_a2a_server_methods(agent: Agent) -> None:
|
||||||
|
"""Inject A2A server methods onto an Agent instance.
|
||||||
|
|
||||||
|
Adds a `to_agent_card(url: str) -> AgentCard` method to the agent
|
||||||
|
that generates an A2A-compliant AgentCard.
|
||||||
|
|
||||||
|
Only injects if the agent has an A2AServerConfig.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The Agent instance to inject methods onto.
|
||||||
|
"""
|
||||||
|
if _get_server_config(agent) is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
def _to_agent_card(self: Agent, url: str) -> AgentCard:
|
||||||
|
return _agent_to_agent_card(self, url)
|
||||||
|
|
||||||
|
object.__setattr__(agent, "to_agent_card", MethodType(_to_agent_card, agent))
|
||||||
@@ -1,16 +1,14 @@
|
|||||||
"""Utility functions for A2A (Agent-to-Agent) protocol delegation."""
|
"""A2A delegation utilities for executing tasks on remote agents."""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from collections.abc import AsyncIterator, MutableMapping
|
from collections.abc import AsyncIterator, MutableMapping
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from functools import lru_cache
|
from typing import TYPE_CHECKING, Any, Literal
|
||||||
import time
|
|
||||||
from typing import TYPE_CHECKING, Any
|
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory
|
from a2a.client import Client, ClientConfig, ClientFactory
|
||||||
from a2a.types import (
|
from a2a.types import (
|
||||||
AgentCard,
|
AgentCard,
|
||||||
Message,
|
Message,
|
||||||
@@ -18,21 +16,16 @@ from a2a.types import (
|
|||||||
PushNotificationConfig as A2APushNotificationConfig,
|
PushNotificationConfig as A2APushNotificationConfig,
|
||||||
Role,
|
Role,
|
||||||
TextPart,
|
TextPart,
|
||||||
TransportProtocol,
|
|
||||||
)
|
)
|
||||||
from aiocache import cached # type: ignore[import-untyped]
|
|
||||||
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
|
|
||||||
import httpx
|
import httpx
|
||||||
from pydantic import BaseModel, Field, create_model
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth
|
from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth
|
||||||
from crewai.a2a.auth.utils import (
|
from crewai.a2a.auth.utils import (
|
||||||
_auth_store,
|
_auth_store,
|
||||||
configure_auth_client,
|
configure_auth_client,
|
||||||
retry_on_401,
|
|
||||||
validate_auth_against_agent_card,
|
validate_auth_against_agent_card,
|
||||||
)
|
)
|
||||||
from crewai.a2a.config import A2AConfig
|
|
||||||
from crewai.a2a.task_helpers import TaskStateResult
|
from crewai.a2a.task_helpers import TaskStateResult
|
||||||
from crewai.a2a.types import (
|
from crewai.a2a.types import (
|
||||||
HANDLER_REGISTRY,
|
HANDLER_REGISTRY,
|
||||||
@@ -46,6 +39,7 @@ from crewai.a2a.updates import (
|
|||||||
StreamingHandler,
|
StreamingHandler,
|
||||||
UpdateConfig,
|
UpdateConfig,
|
||||||
)
|
)
|
||||||
|
from crewai.a2a.utils.agent_card import _afetch_agent_card_cached
|
||||||
from crewai.events.event_bus import crewai_event_bus
|
from crewai.events.event_bus import crewai_event_bus
|
||||||
from crewai.events.types.a2a_events import (
|
from crewai.events.types.a2a_events import (
|
||||||
A2AConversationStartedEvent,
|
A2AConversationStartedEvent,
|
||||||
@@ -53,7 +47,6 @@ from crewai.events.types.a2a_events import (
|
|||||||
A2ADelegationStartedEvent,
|
A2ADelegationStartedEvent,
|
||||||
A2AMessageSentEvent,
|
A2AMessageSentEvent,
|
||||||
)
|
)
|
||||||
from crewai.types.utils import create_literals_from_strings
|
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -76,189 +69,9 @@ def get_handler(config: UpdateConfig | None) -> HandlerType:
|
|||||||
return HANDLER_REGISTRY.get(type(config), StreamingHandler)
|
return HANDLER_REGISTRY.get(type(config), StreamingHandler)
|
||||||
|
|
||||||
|
|
||||||
@lru_cache()
|
|
||||||
def _fetch_agent_card_cached(
|
|
||||||
endpoint: str,
|
|
||||||
auth_hash: int,
|
|
||||||
timeout: int,
|
|
||||||
_ttl_hash: int,
|
|
||||||
) -> AgentCard:
|
|
||||||
"""Cached sync version of fetch_agent_card."""
|
|
||||||
auth = _auth_store.get(auth_hash)
|
|
||||||
|
|
||||||
loop = asyncio.new_event_loop()
|
|
||||||
asyncio.set_event_loop(loop)
|
|
||||||
try:
|
|
||||||
return loop.run_until_complete(
|
|
||||||
_afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
loop.close()
|
|
||||||
|
|
||||||
|
|
||||||
def fetch_agent_card(
|
|
||||||
endpoint: str,
|
|
||||||
auth: AuthScheme | None = None,
|
|
||||||
timeout: int = 30,
|
|
||||||
use_cache: bool = True,
|
|
||||||
cache_ttl: int = 300,
|
|
||||||
) -> AgentCard:
|
|
||||||
"""Fetch AgentCard from an A2A endpoint with optional caching.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
endpoint: A2A agent endpoint URL (AgentCard URL)
|
|
||||||
auth: Optional AuthScheme for authentication
|
|
||||||
timeout: Request timeout in seconds
|
|
||||||
use_cache: Whether to use caching (default True)
|
|
||||||
cache_ttl: Cache TTL in seconds (default 300 = 5 minutes)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
AgentCard object with agent capabilities and skills
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
httpx.HTTPStatusError: If the request fails
|
|
||||||
A2AClientHTTPError: If authentication fails
|
|
||||||
"""
|
|
||||||
if use_cache:
|
|
||||||
if auth:
|
|
||||||
auth_data = auth.model_dump_json(
|
|
||||||
exclude={
|
|
||||||
"_access_token",
|
|
||||||
"_token_expires_at",
|
|
||||||
"_refresh_token",
|
|
||||||
"_authorization_callback",
|
|
||||||
}
|
|
||||||
)
|
|
||||||
auth_hash = hash((type(auth).__name__, auth_data))
|
|
||||||
else:
|
|
||||||
auth_hash = 0
|
|
||||||
_auth_store[auth_hash] = auth
|
|
||||||
ttl_hash = int(time.time() // cache_ttl)
|
|
||||||
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
|
|
||||||
|
|
||||||
loop = asyncio.new_event_loop()
|
|
||||||
asyncio.set_event_loop(loop)
|
|
||||||
try:
|
|
||||||
return loop.run_until_complete(
|
|
||||||
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
loop.close()
|
|
||||||
|
|
||||||
|
|
||||||
async def afetch_agent_card(
|
|
||||||
endpoint: str,
|
|
||||||
auth: AuthScheme | None = None,
|
|
||||||
timeout: int = 30,
|
|
||||||
use_cache: bool = True,
|
|
||||||
) -> AgentCard:
|
|
||||||
"""Fetch AgentCard from an A2A endpoint asynchronously.
|
|
||||||
|
|
||||||
Native async implementation. Use this when running in an async context.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
endpoint: A2A agent endpoint URL (AgentCard URL).
|
|
||||||
auth: Optional AuthScheme for authentication.
|
|
||||||
timeout: Request timeout in seconds.
|
|
||||||
use_cache: Whether to use caching (default True).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
AgentCard object with agent capabilities and skills.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
httpx.HTTPStatusError: If the request fails.
|
|
||||||
A2AClientHTTPError: If authentication fails.
|
|
||||||
"""
|
|
||||||
if use_cache:
|
|
||||||
if auth:
|
|
||||||
auth_data = auth.model_dump_json(
|
|
||||||
exclude={
|
|
||||||
"_access_token",
|
|
||||||
"_token_expires_at",
|
|
||||||
"_refresh_token",
|
|
||||||
"_authorization_callback",
|
|
||||||
}
|
|
||||||
)
|
|
||||||
auth_hash = hash((type(auth).__name__, auth_data))
|
|
||||||
else:
|
|
||||||
auth_hash = 0
|
|
||||||
_auth_store[auth_hash] = auth
|
|
||||||
agent_card: AgentCard = await _afetch_agent_card_cached(
|
|
||||||
endpoint, auth_hash, timeout
|
|
||||||
)
|
|
||||||
return agent_card
|
|
||||||
|
|
||||||
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
|
|
||||||
|
|
||||||
|
|
||||||
@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator]
|
|
||||||
async def _afetch_agent_card_cached(
|
|
||||||
endpoint: str,
|
|
||||||
auth_hash: int,
|
|
||||||
timeout: int,
|
|
||||||
) -> AgentCard:
|
|
||||||
"""Cached async implementation of AgentCard fetching."""
|
|
||||||
auth = _auth_store.get(auth_hash)
|
|
||||||
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
|
|
||||||
|
|
||||||
|
|
||||||
async def _afetch_agent_card_impl(
|
|
||||||
endpoint: str,
|
|
||||||
auth: AuthScheme | None,
|
|
||||||
timeout: int,
|
|
||||||
) -> AgentCard:
|
|
||||||
"""Internal async implementation of AgentCard fetching."""
|
|
||||||
if "/.well-known/agent-card.json" in endpoint:
|
|
||||||
base_url = endpoint.replace("/.well-known/agent-card.json", "")
|
|
||||||
agent_card_path = "/.well-known/agent-card.json"
|
|
||||||
else:
|
|
||||||
url_parts = endpoint.split("/", 3)
|
|
||||||
base_url = f"{url_parts[0]}//{url_parts[2]}"
|
|
||||||
agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/"
|
|
||||||
|
|
||||||
headers: MutableMapping[str, str] = {}
|
|
||||||
if auth:
|
|
||||||
async with httpx.AsyncClient(timeout=timeout) as temp_auth_client:
|
|
||||||
if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
|
|
||||||
configure_auth_client(auth, temp_auth_client)
|
|
||||||
headers = await auth.apply_auth(temp_auth_client, {})
|
|
||||||
|
|
||||||
async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client:
|
|
||||||
if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
|
|
||||||
configure_auth_client(auth, temp_client)
|
|
||||||
|
|
||||||
agent_card_url = f"{base_url}{agent_card_path}"
|
|
||||||
|
|
||||||
async def _fetch_agent_card_request() -> httpx.Response:
|
|
||||||
return await temp_client.get(agent_card_url)
|
|
||||||
|
|
||||||
try:
|
|
||||||
response = await retry_on_401(
|
|
||||||
request_func=_fetch_agent_card_request,
|
|
||||||
auth_scheme=auth,
|
|
||||||
client=temp_client,
|
|
||||||
headers=temp_client.headers,
|
|
||||||
max_retries=2,
|
|
||||||
)
|
|
||||||
response.raise_for_status()
|
|
||||||
|
|
||||||
return AgentCard.model_validate(response.json())
|
|
||||||
|
|
||||||
except httpx.HTTPStatusError as e:
|
|
||||||
if e.response.status_code == 401:
|
|
||||||
error_details = ["Authentication failed"]
|
|
||||||
www_auth = e.response.headers.get("WWW-Authenticate")
|
|
||||||
if www_auth:
|
|
||||||
error_details.append(f"WWW-Authenticate: {www_auth}")
|
|
||||||
if not auth:
|
|
||||||
error_details.append("No auth scheme provided")
|
|
||||||
msg = " | ".join(error_details)
|
|
||||||
raise A2AClientHTTPError(401, msg) from e
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def execute_a2a_delegation(
|
def execute_a2a_delegation(
|
||||||
endpoint: str,
|
endpoint: str,
|
||||||
|
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
|
||||||
auth: AuthScheme | None,
|
auth: AuthScheme | None,
|
||||||
timeout: int,
|
timeout: int,
|
||||||
task_description: str,
|
task_description: str,
|
||||||
@@ -282,6 +95,23 @@ def execute_a2a_delegation(
|
|||||||
use aexecute_a2a_delegation directly.
|
use aexecute_a2a_delegation directly.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
endpoint: A2A agent endpoint URL (AgentCard URL)
|
||||||
|
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
|
||||||
|
auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest)
|
||||||
|
timeout: Request timeout in seconds
|
||||||
|
task_description: The task to delegate
|
||||||
|
context: Optional context information
|
||||||
|
context_id: Context ID for correlating messages/tasks
|
||||||
|
task_id: Specific task identifier
|
||||||
|
reference_task_ids: List of related task IDs
|
||||||
|
metadata: Additional metadata (external_id, request_id, etc.)
|
||||||
|
extensions: Protocol extensions for custom fields
|
||||||
|
conversation_history: Previous Message objects from conversation
|
||||||
|
agent_id: Agent identifier for logging
|
||||||
|
agent_role: Role of the CrewAI agent delegating the task
|
||||||
|
agent_branch: Optional agent tree branch for logging
|
||||||
|
response_model: Optional Pydantic model for structured outputs
|
||||||
|
turn_number: Optional turn number for multi-turn conversations
|
||||||
endpoint: A2A agent endpoint URL.
|
endpoint: A2A agent endpoint URL.
|
||||||
auth: Optional AuthScheme for authentication.
|
auth: Optional AuthScheme for authentication.
|
||||||
timeout: Request timeout in seconds.
|
timeout: Request timeout in seconds.
|
||||||
@@ -323,6 +153,7 @@ def execute_a2a_delegation(
|
|||||||
agent_role=agent_role,
|
agent_role=agent_role,
|
||||||
agent_branch=agent_branch,
|
agent_branch=agent_branch,
|
||||||
response_model=response_model,
|
response_model=response_model,
|
||||||
|
transport_protocol=transport_protocol,
|
||||||
turn_number=turn_number,
|
turn_number=turn_number,
|
||||||
updates=updates,
|
updates=updates,
|
||||||
)
|
)
|
||||||
@@ -333,6 +164,7 @@ def execute_a2a_delegation(
|
|||||||
|
|
||||||
async def aexecute_a2a_delegation(
|
async def aexecute_a2a_delegation(
|
||||||
endpoint: str,
|
endpoint: str,
|
||||||
|
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
|
||||||
auth: AuthScheme | None,
|
auth: AuthScheme | None,
|
||||||
timeout: int,
|
timeout: int,
|
||||||
task_description: str,
|
task_description: str,
|
||||||
@@ -356,6 +188,23 @@ async def aexecute_a2a_delegation(
|
|||||||
in an async context (e.g., with Crew.akickoff() or agent.aexecute_task()).
|
in an async context (e.g., with Crew.akickoff() or agent.aexecute_task()).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
endpoint: A2A agent endpoint URL
|
||||||
|
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
|
||||||
|
auth: Optional AuthScheme for authentication
|
||||||
|
timeout: Request timeout in seconds
|
||||||
|
task_description: Task to delegate
|
||||||
|
context: Optional context
|
||||||
|
context_id: Context ID for correlation
|
||||||
|
task_id: Specific task identifier
|
||||||
|
reference_task_ids: Related task IDs
|
||||||
|
metadata: Additional metadata
|
||||||
|
extensions: Protocol extensions
|
||||||
|
conversation_history: Previous Message objects
|
||||||
|
turn_number: Current turn number
|
||||||
|
agent_branch: Agent tree branch for logging
|
||||||
|
agent_id: Agent identifier for logging
|
||||||
|
agent_role: Agent role for logging
|
||||||
|
response_model: Optional Pydantic model for structured outputs
|
||||||
endpoint: A2A agent endpoint URL.
|
endpoint: A2A agent endpoint URL.
|
||||||
auth: Optional AuthScheme for authentication.
|
auth: Optional AuthScheme for authentication.
|
||||||
timeout: Request timeout in seconds.
|
timeout: Request timeout in seconds.
|
||||||
@@ -414,6 +263,7 @@ async def aexecute_a2a_delegation(
|
|||||||
agent_role=agent_role,
|
agent_role=agent_role,
|
||||||
response_model=response_model,
|
response_model=response_model,
|
||||||
updates=updates,
|
updates=updates,
|
||||||
|
transport_protocol=transport_protocol,
|
||||||
)
|
)
|
||||||
|
|
||||||
crewai_event_bus.emit(
|
crewai_event_bus.emit(
|
||||||
@@ -431,6 +281,7 @@ async def aexecute_a2a_delegation(
|
|||||||
|
|
||||||
async def _aexecute_a2a_delegation_impl(
|
async def _aexecute_a2a_delegation_impl(
|
||||||
endpoint: str,
|
endpoint: str,
|
||||||
|
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
|
||||||
auth: AuthScheme | None,
|
auth: AuthScheme | None,
|
||||||
timeout: int,
|
timeout: int,
|
||||||
task_description: str,
|
task_description: str,
|
||||||
@@ -524,7 +375,6 @@ async def _aexecute_a2a_delegation_impl(
|
|||||||
extensions=extensions,
|
extensions=extensions,
|
||||||
)
|
)
|
||||||
|
|
||||||
transport_protocol = TransportProtocol("JSONRPC")
|
|
||||||
new_messages: list[Message] = [*conversation_history, message]
|
new_messages: list[Message] = [*conversation_history, message]
|
||||||
crewai_event_bus.emit(
|
crewai_event_bus.emit(
|
||||||
None,
|
None,
|
||||||
@@ -596,7 +446,7 @@ async def _aexecute_a2a_delegation_impl(
|
|||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _create_a2a_client(
|
async def _create_a2a_client(
|
||||||
agent_card: AgentCard,
|
agent_card: AgentCard,
|
||||||
transport_protocol: TransportProtocol,
|
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
|
||||||
timeout: int,
|
timeout: int,
|
||||||
headers: MutableMapping[str, str],
|
headers: MutableMapping[str, str],
|
||||||
streaming: bool,
|
streaming: bool,
|
||||||
@@ -607,19 +457,18 @@ async def _create_a2a_client(
|
|||||||
"""Create and configure an A2A client.
|
"""Create and configure an A2A client.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
agent_card: The A2A agent card
|
agent_card: The A2A agent card.
|
||||||
transport_protocol: Transport protocol to use
|
transport_protocol: Transport protocol to use.
|
||||||
timeout: Request timeout in seconds
|
timeout: Request timeout in seconds.
|
||||||
headers: HTTP headers (already with auth applied)
|
headers: HTTP headers (already with auth applied).
|
||||||
streaming: Enable streaming responses
|
streaming: Enable streaming responses.
|
||||||
auth: Optional AuthScheme for client configuration
|
auth: Optional AuthScheme for client configuration.
|
||||||
use_polling: Enable polling mode
|
use_polling: Enable polling mode.
|
||||||
push_notification_config: Optional push notification config to include in requests
|
push_notification_config: Optional push notification config.
|
||||||
|
|
||||||
Yields:
|
Yields:
|
||||||
Configured A2A client instance
|
Configured A2A client instance.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
async with httpx.AsyncClient(
|
async with httpx.AsyncClient(
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
@@ -640,7 +489,7 @@ async def _create_a2a_client(
|
|||||||
|
|
||||||
config = ClientConfig(
|
config = ClientConfig(
|
||||||
httpx_client=httpx_client,
|
httpx_client=httpx_client,
|
||||||
supported_transports=[str(transport_protocol.value)],
|
supported_transports=[transport_protocol],
|
||||||
streaming=streaming and not use_polling,
|
streaming=streaming and not use_polling,
|
||||||
polling=use_polling,
|
polling=use_polling,
|
||||||
accepted_output_modes=["application/json"],
|
accepted_output_modes=["application/json"],
|
||||||
@@ -650,78 +499,3 @@ async def _create_a2a_client(
|
|||||||
factory = ClientFactory(config)
|
factory = ClientFactory(config)
|
||||||
client = factory.create(agent_card)
|
client = factory.create(agent_card)
|
||||||
yield client
|
yield client
|
||||||
|
|
||||||
|
|
||||||
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]:
|
|
||||||
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
agent_ids: List of available A2A agent IDs
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dynamically created Pydantic model with Literal-constrained a2a_ids field
|
|
||||||
"""
|
|
||||||
|
|
||||||
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
|
|
||||||
|
|
||||||
return create_model(
|
|
||||||
"AgentResponse",
|
|
||||||
a2a_ids=(
|
|
||||||
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
|
|
||||||
Field(
|
|
||||||
default_factory=tuple,
|
|
||||||
max_length=len(agent_ids),
|
|
||||||
description="A2A agent IDs to delegate to.",
|
|
||||||
),
|
|
||||||
),
|
|
||||||
message=(
|
|
||||||
str,
|
|
||||||
Field(
|
|
||||||
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
|
|
||||||
),
|
|
||||||
),
|
|
||||||
is_a2a=(
|
|
||||||
bool,
|
|
||||||
Field(
|
|
||||||
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
|
|
||||||
),
|
|
||||||
),
|
|
||||||
__base__=BaseModel,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def extract_a2a_agent_ids_from_config(
|
|
||||||
a2a_config: list[A2AConfig] | A2AConfig | None,
|
|
||||||
) -> tuple[list[A2AConfig], tuple[str, ...]]:
|
|
||||||
"""Extract A2A agent IDs from A2A configuration.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
a2a_config: A2A configuration
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of A2A agent IDs
|
|
||||||
"""
|
|
||||||
if a2a_config is None:
|
|
||||||
return [], ()
|
|
||||||
|
|
||||||
if isinstance(a2a_config, A2AConfig):
|
|
||||||
a2a_agents = [a2a_config]
|
|
||||||
else:
|
|
||||||
a2a_agents = a2a_config
|
|
||||||
return a2a_agents, tuple(config.endpoint for config in a2a_agents)
|
|
||||||
|
|
||||||
|
|
||||||
def get_a2a_agents_and_response_model(
|
|
||||||
a2a_config: list[A2AConfig] | A2AConfig | None,
|
|
||||||
) -> tuple[list[A2AConfig], type[BaseModel]]:
|
|
||||||
"""Get A2A agent IDs and response model.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
a2a_config: A2A configuration
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Tuple of A2A agent IDs and response model
|
|
||||||
"""
|
|
||||||
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
|
|
||||||
|
|
||||||
return a2a_agents, create_agent_response_model(agent_ids)
|
|
||||||
101
lib/crewai/src/crewai/a2a/utils/response_model.py
Normal file
101
lib/crewai/src/crewai/a2a/utils/response_model.py
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
"""Response model utilities for A2A agent interactions."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import TypeAlias
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field, create_model
|
||||||
|
|
||||||
|
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
|
||||||
|
from crewai.types.utils import create_literals_from_strings
|
||||||
|
|
||||||
|
|
||||||
|
A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig
|
||||||
|
A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig
|
||||||
|
|
||||||
|
|
||||||
|
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel] | None:
|
||||||
|
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent_ids: List of available A2A agent IDs.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dynamically created Pydantic model with Literal-constrained a2a_ids field,
|
||||||
|
or None if agent_ids is empty.
|
||||||
|
"""
|
||||||
|
if not agent_ids:
|
||||||
|
return None
|
||||||
|
|
||||||
|
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
|
||||||
|
|
||||||
|
return create_model(
|
||||||
|
"AgentResponse",
|
||||||
|
a2a_ids=(
|
||||||
|
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
|
||||||
|
Field(
|
||||||
|
default_factory=tuple,
|
||||||
|
max_length=len(agent_ids),
|
||||||
|
description="A2A agent IDs to delegate to.",
|
||||||
|
),
|
||||||
|
),
|
||||||
|
message=(
|
||||||
|
str,
|
||||||
|
Field(
|
||||||
|
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
|
||||||
|
),
|
||||||
|
),
|
||||||
|
is_a2a=(
|
||||||
|
bool,
|
||||||
|
Field(
|
||||||
|
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
|
||||||
|
),
|
||||||
|
),
|
||||||
|
__base__=BaseModel,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_a2a_agent_ids_from_config(
|
||||||
|
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
|
||||||
|
) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]:
|
||||||
|
"""Extract A2A agent IDs from A2A configuration.
|
||||||
|
|
||||||
|
Filters out A2AServerConfig since it doesn't have an endpoint for delegation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
a2a_config: A2A configuration (any type).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of client A2A configs list and agent endpoint IDs.
|
||||||
|
"""
|
||||||
|
if a2a_config is None:
|
||||||
|
return [], ()
|
||||||
|
|
||||||
|
configs: list[A2AConfigTypes]
|
||||||
|
if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)):
|
||||||
|
configs = [a2a_config]
|
||||||
|
else:
|
||||||
|
configs = a2a_config
|
||||||
|
|
||||||
|
# Filter to only client configs (those with endpoint)
|
||||||
|
client_configs: list[A2AClientConfigTypes] = [
|
||||||
|
config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig))
|
||||||
|
]
|
||||||
|
|
||||||
|
return client_configs, tuple(config.endpoint for config in client_configs)
|
||||||
|
|
||||||
|
|
||||||
|
def get_a2a_agents_and_response_model(
|
||||||
|
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
|
||||||
|
) -> tuple[list[A2AClientConfigTypes], type[BaseModel] | None]:
|
||||||
|
"""Get A2A agent configs and response model.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
a2a_config: A2A configuration (any type).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of client A2A configs and response model.
|
||||||
|
"""
|
||||||
|
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
|
||||||
|
|
||||||
|
return a2a_agents, create_agent_response_model(agent_ids)
|
||||||
284
lib/crewai/src/crewai/a2a/utils/task.py
Normal file
284
lib/crewai/src/crewai/a2a/utils/task.py
Normal file
@@ -0,0 +1,284 @@
|
|||||||
|
"""A2A task utilities for server-side task management."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from collections.abc import Callable, Coroutine
|
||||||
|
from functools import wraps
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
|
||||||
|
|
||||||
|
from a2a.server.agent_execution import RequestContext
|
||||||
|
from a2a.server.events import EventQueue
|
||||||
|
from a2a.types import (
|
||||||
|
InternalError,
|
||||||
|
InvalidParamsError,
|
||||||
|
Message,
|
||||||
|
Task as A2ATask,
|
||||||
|
TaskState,
|
||||||
|
TaskStatus,
|
||||||
|
TaskStatusUpdateEvent,
|
||||||
|
)
|
||||||
|
from a2a.utils import new_agent_text_message, new_text_artifact
|
||||||
|
from a2a.utils.errors import ServerError
|
||||||
|
from aiocache import SimpleMemoryCache, caches # type: ignore[import-untyped]
|
||||||
|
|
||||||
|
from crewai.events.event_bus import crewai_event_bus
|
||||||
|
from crewai.events.types.a2a_events import (
|
||||||
|
A2AServerTaskCanceledEvent,
|
||||||
|
A2AServerTaskCompletedEvent,
|
||||||
|
A2AServerTaskFailedEvent,
|
||||||
|
A2AServerTaskStartedEvent,
|
||||||
|
)
|
||||||
|
from crewai.task import Task
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from crewai.agent import Agent
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
P = ParamSpec("P")
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_redis_url(url: str) -> dict[str, Any]:
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
parsed = urlparse(url)
|
||||||
|
config: dict[str, Any] = {
|
||||||
|
"cache": "aiocache.RedisCache",
|
||||||
|
"endpoint": parsed.hostname or "localhost",
|
||||||
|
"port": parsed.port or 6379,
|
||||||
|
}
|
||||||
|
if parsed.path and parsed.path != "/":
|
||||||
|
try:
|
||||||
|
config["db"] = int(parsed.path.lstrip("/"))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
if parsed.password:
|
||||||
|
config["password"] = parsed.password
|
||||||
|
return config
|
||||||
|
|
||||||
|
|
||||||
|
_redis_url = os.environ.get("REDIS_URL")
|
||||||
|
|
||||||
|
caches.set_config(
|
||||||
|
{
|
||||||
|
"default": _parse_redis_url(_redis_url)
|
||||||
|
if _redis_url
|
||||||
|
else {
|
||||||
|
"cache": "aiocache.SimpleMemoryCache",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def cancellable(
|
||||||
|
fn: Callable[P, Coroutine[Any, Any, T]],
|
||||||
|
) -> Callable[P, Coroutine[Any, Any, T]]:
|
||||||
|
"""Decorator that enables cancellation for A2A task execution.
|
||||||
|
|
||||||
|
Runs a cancellation watcher concurrently with the wrapped function.
|
||||||
|
When a cancel event is published, the execution is cancelled.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
fn: The async function to wrap.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Wrapped function with cancellation support.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@wraps(fn)
|
||||||
|
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
|
||||||
|
"""Wrap function with cancellation monitoring."""
|
||||||
|
context: RequestContext | None = None
|
||||||
|
for arg in args:
|
||||||
|
if isinstance(arg, RequestContext):
|
||||||
|
context = arg
|
||||||
|
break
|
||||||
|
if context is None:
|
||||||
|
context = cast(RequestContext | None, kwargs.get("context"))
|
||||||
|
|
||||||
|
if context is None:
|
||||||
|
return await fn(*args, **kwargs)
|
||||||
|
|
||||||
|
task_id = context.task_id
|
||||||
|
cache = caches.get("default")
|
||||||
|
|
||||||
|
async def poll_for_cancel() -> bool:
|
||||||
|
"""Poll cache for cancellation flag."""
|
||||||
|
while True:
|
||||||
|
if await cache.get(f"cancel:{task_id}"):
|
||||||
|
return True
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
async def watch_for_cancel() -> bool:
|
||||||
|
"""Watch for cancellation events via pub/sub or polling."""
|
||||||
|
if isinstance(cache, SimpleMemoryCache):
|
||||||
|
return await poll_for_cancel()
|
||||||
|
|
||||||
|
try:
|
||||||
|
client = cache.client
|
||||||
|
pubsub = client.pubsub()
|
||||||
|
await pubsub.subscribe(f"cancel:{task_id}")
|
||||||
|
async for message in pubsub.listen():
|
||||||
|
if message["type"] == "message":
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Cancel watcher error for task_id=%s: %s", task_id, e)
|
||||||
|
return await poll_for_cancel()
|
||||||
|
return False
|
||||||
|
|
||||||
|
execute_task = asyncio.create_task(fn(*args, **kwargs))
|
||||||
|
cancel_watch = asyncio.create_task(watch_for_cancel())
|
||||||
|
|
||||||
|
try:
|
||||||
|
done, _ = await asyncio.wait(
|
||||||
|
[execute_task, cancel_watch],
|
||||||
|
return_when=asyncio.FIRST_COMPLETED,
|
||||||
|
)
|
||||||
|
|
||||||
|
if cancel_watch in done:
|
||||||
|
execute_task.cancel()
|
||||||
|
try:
|
||||||
|
await execute_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
raise asyncio.CancelledError(f"Task {task_id} was cancelled")
|
||||||
|
cancel_watch.cancel()
|
||||||
|
return execute_task.result()
|
||||||
|
finally:
|
||||||
|
await cache.delete(f"cancel:{task_id}")
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
@cancellable
|
||||||
|
async def execute(
|
||||||
|
agent: Agent,
|
||||||
|
context: RequestContext,
|
||||||
|
event_queue: EventQueue,
|
||||||
|
) -> None:
|
||||||
|
"""Execute an A2A task using a CrewAI agent.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The CrewAI agent to execute the task.
|
||||||
|
context: The A2A request context containing the user's message.
|
||||||
|
event_queue: The event queue for sending responses back.
|
||||||
|
|
||||||
|
TODOs:
|
||||||
|
* need to impl both of structured output and file inputs, depends on `file_inputs` for
|
||||||
|
`crewai.task.Task`, pass the below two to Task. both utils in `a2a.utils.parts`
|
||||||
|
* structured outputs ingestion, `structured_inputs = get_data_parts(parts=context.message.parts)`
|
||||||
|
* file inputs ingestion, `file_inputs = get_file_parts(parts=context.message.parts)`
|
||||||
|
"""
|
||||||
|
|
||||||
|
user_message = context.get_user_input()
|
||||||
|
task_id = context.task_id
|
||||||
|
context_id = context.context_id
|
||||||
|
if task_id is None or context_id is None:
|
||||||
|
msg = "task_id and context_id are required"
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
agent,
|
||||||
|
A2AServerTaskFailedEvent(a2a_task_id="", a2a_context_id="", error=msg),
|
||||||
|
)
|
||||||
|
raise ServerError(InvalidParamsError(message=msg)) from None
|
||||||
|
|
||||||
|
task = Task(
|
||||||
|
description=user_message,
|
||||||
|
expected_output="Response to the user's request",
|
||||||
|
agent=agent,
|
||||||
|
)
|
||||||
|
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
agent,
|
||||||
|
A2AServerTaskStartedEvent(a2a_task_id=task_id, a2a_context_id=context_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = await agent.aexecute_task(task=task, tools=agent.tools)
|
||||||
|
result_str = str(result)
|
||||||
|
history: list[Message] = [context.message] if context.message else []
|
||||||
|
history.append(new_agent_text_message(result_str, context_id, task_id))
|
||||||
|
await event_queue.enqueue_event(
|
||||||
|
A2ATask(
|
||||||
|
id=task_id,
|
||||||
|
context_id=context_id,
|
||||||
|
status=TaskStatus(state=TaskState.input_required),
|
||||||
|
artifacts=[new_text_artifact(result_str, f"result_{task_id}")],
|
||||||
|
history=history,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
agent,
|
||||||
|
A2AServerTaskCompletedEvent(
|
||||||
|
a2a_task_id=task_id, a2a_context_id=context_id, result=str(result)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
agent,
|
||||||
|
A2AServerTaskCanceledEvent(a2a_task_id=task_id, a2a_context_id=context_id),
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
agent,
|
||||||
|
A2AServerTaskFailedEvent(
|
||||||
|
a2a_task_id=task_id, a2a_context_id=context_id, error=str(e)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
raise ServerError(
|
||||||
|
error=InternalError(message=f"Task execution failed: {e}")
|
||||||
|
) from e
|
||||||
|
|
||||||
|
|
||||||
|
async def cancel(
|
||||||
|
context: RequestContext,
|
||||||
|
event_queue: EventQueue,
|
||||||
|
) -> A2ATask | None:
|
||||||
|
"""Cancel an A2A task.
|
||||||
|
|
||||||
|
Publishes a cancel event that the cancellable decorator listens for.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
context: The A2A request context containing task information.
|
||||||
|
event_queue: The event queue for sending the cancellation status.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The canceled task with updated status.
|
||||||
|
"""
|
||||||
|
task_id = context.task_id
|
||||||
|
context_id = context.context_id
|
||||||
|
if task_id is None or context_id is None:
|
||||||
|
raise ServerError(InvalidParamsError(message="task_id and context_id required"))
|
||||||
|
|
||||||
|
if context.current_task and context.current_task.status.state in (
|
||||||
|
TaskState.completed,
|
||||||
|
TaskState.failed,
|
||||||
|
TaskState.canceled,
|
||||||
|
):
|
||||||
|
return context.current_task
|
||||||
|
|
||||||
|
cache = caches.get("default")
|
||||||
|
|
||||||
|
await cache.set(f"cancel:{task_id}", True, ttl=3600)
|
||||||
|
if not isinstance(cache, SimpleMemoryCache):
|
||||||
|
await cache.client.publish(f"cancel:{task_id}", "cancel")
|
||||||
|
|
||||||
|
await event_queue.enqueue_event(
|
||||||
|
TaskStatusUpdateEvent(
|
||||||
|
task_id=task_id,
|
||||||
|
context_id=context_id,
|
||||||
|
status=TaskStatus(state=TaskState.canceled),
|
||||||
|
final=True,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if context.current_task:
|
||||||
|
context.current_task.status = TaskStatus(state=TaskState.canceled)
|
||||||
|
return context.current_task
|
||||||
|
return None
|
||||||
@@ -15,7 +15,7 @@ from typing import TYPE_CHECKING, Any
|
|||||||
from a2a.types import Role, TaskState
|
from a2a.types import Role, TaskState
|
||||||
from pydantic import BaseModel, ValidationError
|
from pydantic import BaseModel, ValidationError
|
||||||
|
|
||||||
from crewai.a2a.config import A2AConfig
|
from crewai.a2a.config import A2AClientConfig, A2AConfig
|
||||||
from crewai.a2a.extensions.base import ExtensionRegistry
|
from crewai.a2a.extensions.base import ExtensionRegistry
|
||||||
from crewai.a2a.task_helpers import TaskStateResult
|
from crewai.a2a.task_helpers import TaskStateResult
|
||||||
from crewai.a2a.templates import (
|
from crewai.a2a.templates import (
|
||||||
@@ -26,13 +26,16 @@ from crewai.a2a.templates import (
|
|||||||
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE,
|
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE,
|
||||||
)
|
)
|
||||||
from crewai.a2a.types import AgentResponseProtocol
|
from crewai.a2a.types import AgentResponseProtocol
|
||||||
from crewai.a2a.utils import (
|
from crewai.a2a.utils.agent_card import (
|
||||||
aexecute_a2a_delegation,
|
|
||||||
afetch_agent_card,
|
afetch_agent_card,
|
||||||
execute_a2a_delegation,
|
|
||||||
fetch_agent_card,
|
fetch_agent_card,
|
||||||
get_a2a_agents_and_response_model,
|
inject_a2a_server_methods,
|
||||||
)
|
)
|
||||||
|
from crewai.a2a.utils.delegation import (
|
||||||
|
aexecute_a2a_delegation,
|
||||||
|
execute_a2a_delegation,
|
||||||
|
)
|
||||||
|
from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model
|
||||||
from crewai.events.event_bus import crewai_event_bus
|
from crewai.events.event_bus import crewai_event_bus
|
||||||
from crewai.events.types.a2a_events import (
|
from crewai.events.types.a2a_events import (
|
||||||
A2AConversationCompletedEvent,
|
A2AConversationCompletedEvent,
|
||||||
@@ -122,10 +125,12 @@ def wrap_agent_with_a2a_instance(
|
|||||||
agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent)
|
agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
inject_a2a_server_methods(agent)
|
||||||
|
|
||||||
|
|
||||||
def _fetch_card_from_config(
|
def _fetch_card_from_config(
|
||||||
config: A2AConfig,
|
config: A2AConfig | A2AClientConfig,
|
||||||
) -> tuple[A2AConfig, AgentCard | Exception]:
|
) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]:
|
||||||
"""Fetch agent card from A2A config.
|
"""Fetch agent card from A2A config.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -146,7 +151,7 @@ def _fetch_card_from_config(
|
|||||||
|
|
||||||
|
|
||||||
def _fetch_agent_cards_concurrently(
|
def _fetch_agent_cards_concurrently(
|
||||||
a2a_agents: list[A2AConfig],
|
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||||
) -> tuple[dict[str, AgentCard], dict[str, str]]:
|
) -> tuple[dict[str, AgentCard], dict[str, str]]:
|
||||||
"""Fetch agent cards concurrently for multiple A2A agents.
|
"""Fetch agent cards concurrently for multiple A2A agents.
|
||||||
|
|
||||||
@@ -181,7 +186,7 @@ def _fetch_agent_cards_concurrently(
|
|||||||
|
|
||||||
def _execute_task_with_a2a(
|
def _execute_task_with_a2a(
|
||||||
self: Agent,
|
self: Agent,
|
||||||
a2a_agents: list[A2AConfig],
|
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||||
original_fn: Callable[..., str],
|
original_fn: Callable[..., str],
|
||||||
task: Task,
|
task: Task,
|
||||||
agent_response_model: type[BaseModel],
|
agent_response_model: type[BaseModel],
|
||||||
@@ -270,7 +275,7 @@ def _execute_task_with_a2a(
|
|||||||
|
|
||||||
|
|
||||||
def _augment_prompt_with_a2a(
|
def _augment_prompt_with_a2a(
|
||||||
a2a_agents: list[A2AConfig],
|
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||||
task_description: str,
|
task_description: str,
|
||||||
agent_cards: dict[str, AgentCard],
|
agent_cards: dict[str, AgentCard],
|
||||||
conversation_history: list[Message] | None = None,
|
conversation_history: list[Message] | None = None,
|
||||||
@@ -523,11 +528,11 @@ def _prepare_delegation_context(
|
|||||||
task: Task,
|
task: Task,
|
||||||
original_task_description: str | None,
|
original_task_description: str | None,
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
list[A2AConfig],
|
list[A2AConfig | A2AClientConfig],
|
||||||
type[BaseModel],
|
type[BaseModel],
|
||||||
str,
|
str,
|
||||||
str,
|
str,
|
||||||
A2AConfig,
|
A2AConfig | A2AClientConfig,
|
||||||
str | None,
|
str | None,
|
||||||
str | None,
|
str | None,
|
||||||
dict[str, Any] | None,
|
dict[str, Any] | None,
|
||||||
@@ -591,7 +596,7 @@ def _handle_task_completion(
|
|||||||
task: Task,
|
task: Task,
|
||||||
task_id_config: str | None,
|
task_id_config: str | None,
|
||||||
reference_task_ids: list[str],
|
reference_task_ids: list[str],
|
||||||
agent_config: A2AConfig,
|
agent_config: A2AConfig | A2AClientConfig,
|
||||||
turn_num: int,
|
turn_num: int,
|
||||||
) -> tuple[str | None, str | None, list[str]]:
|
) -> tuple[str | None, str | None, list[str]]:
|
||||||
"""Handle task completion state including reference task updates.
|
"""Handle task completion state including reference task updates.
|
||||||
@@ -631,7 +636,7 @@ def _handle_agent_response_and_continue(
|
|||||||
a2a_result: TaskStateResult,
|
a2a_result: TaskStateResult,
|
||||||
agent_id: str,
|
agent_id: str,
|
||||||
agent_cards: dict[str, AgentCard] | None,
|
agent_cards: dict[str, AgentCard] | None,
|
||||||
a2a_agents: list[A2AConfig],
|
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||||
original_task_description: str,
|
original_task_description: str,
|
||||||
conversation_history: list[Message],
|
conversation_history: list[Message],
|
||||||
turn_num: int,
|
turn_num: int,
|
||||||
@@ -771,6 +776,7 @@ def _delegate_to_a2a(
|
|||||||
response_model=agent_config.response_model,
|
response_model=agent_config.response_model,
|
||||||
turn_number=turn_num + 1,
|
turn_number=turn_num + 1,
|
||||||
updates=agent_config.updates,
|
updates=agent_config.updates,
|
||||||
|
transport_protocol=agent_config.transport_protocol,
|
||||||
)
|
)
|
||||||
|
|
||||||
conversation_history = a2a_result.get("history", [])
|
conversation_history = a2a_result.get("history", [])
|
||||||
@@ -867,8 +873,8 @@ def _delegate_to_a2a(
|
|||||||
|
|
||||||
|
|
||||||
async def _afetch_card_from_config(
|
async def _afetch_card_from_config(
|
||||||
config: A2AConfig,
|
config: A2AConfig | A2AClientConfig,
|
||||||
) -> tuple[A2AConfig, AgentCard | Exception]:
|
) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]:
|
||||||
"""Fetch agent card from A2A config asynchronously."""
|
"""Fetch agent card from A2A config asynchronously."""
|
||||||
try:
|
try:
|
||||||
card = await afetch_agent_card(
|
card = await afetch_agent_card(
|
||||||
@@ -882,7 +888,7 @@ async def _afetch_card_from_config(
|
|||||||
|
|
||||||
|
|
||||||
async def _afetch_agent_cards_concurrently(
|
async def _afetch_agent_cards_concurrently(
|
||||||
a2a_agents: list[A2AConfig],
|
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||||
) -> tuple[dict[str, AgentCard], dict[str, str]]:
|
) -> tuple[dict[str, AgentCard], dict[str, str]]:
|
||||||
"""Fetch agent cards concurrently for multiple A2A agents using asyncio."""
|
"""Fetch agent cards concurrently for multiple A2A agents using asyncio."""
|
||||||
agent_cards: dict[str, AgentCard] = {}
|
agent_cards: dict[str, AgentCard] = {}
|
||||||
@@ -907,7 +913,7 @@ async def _afetch_agent_cards_concurrently(
|
|||||||
|
|
||||||
async def _aexecute_task_with_a2a(
|
async def _aexecute_task_with_a2a(
|
||||||
self: Agent,
|
self: Agent,
|
||||||
a2a_agents: list[A2AConfig],
|
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||||
original_fn: Callable[..., Coroutine[Any, Any, str]],
|
original_fn: Callable[..., Coroutine[Any, Any, str]],
|
||||||
task: Task,
|
task: Task,
|
||||||
agent_response_model: type[BaseModel],
|
agent_response_model: type[BaseModel],
|
||||||
@@ -986,7 +992,7 @@ async def _ahandle_agent_response_and_continue(
|
|||||||
a2a_result: TaskStateResult,
|
a2a_result: TaskStateResult,
|
||||||
agent_id: str,
|
agent_id: str,
|
||||||
agent_cards: dict[str, AgentCard] | None,
|
agent_cards: dict[str, AgentCard] | None,
|
||||||
a2a_agents: list[A2AConfig],
|
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||||
original_task_description: str,
|
original_task_description: str,
|
||||||
conversation_history: list[Message],
|
conversation_history: list[Message],
|
||||||
turn_num: int,
|
turn_num: int,
|
||||||
@@ -1085,6 +1091,7 @@ async def _adelegate_to_a2a(
|
|||||||
agent_branch=agent_branch,
|
agent_branch=agent_branch,
|
||||||
response_model=agent_config.response_model,
|
response_model=agent_config.response_model,
|
||||||
turn_number=turn_num + 1,
|
turn_number=turn_num + 1,
|
||||||
|
transport_protocol=agent_config.transport_protocol,
|
||||||
updates=agent_config.updates,
|
updates=agent_config.updates,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -17,7 +17,6 @@ from urllib.parse import urlparse
|
|||||||
from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator
|
from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator
|
||||||
from typing_extensions import Self
|
from typing_extensions import Self
|
||||||
|
|
||||||
from crewai.a2a.config import A2AConfig
|
|
||||||
from crewai.agent.utils import (
|
from crewai.agent.utils import (
|
||||||
ahandle_knowledge_retrieval,
|
ahandle_knowledge_retrieval,
|
||||||
apply_training_data,
|
apply_training_data,
|
||||||
@@ -73,11 +72,19 @@ from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_F
|
|||||||
from crewai.utilities.converter import Converter
|
from crewai.utilities.converter import Converter
|
||||||
from crewai.utilities.guardrail_types import GuardrailType
|
from crewai.utilities.guardrail_types import GuardrailType
|
||||||
from crewai.utilities.llm_utils import create_llm
|
from crewai.utilities.llm_utils import create_llm
|
||||||
from crewai.utilities.prompts import Prompts
|
from crewai.utilities.prompts import Prompts, StandardPromptResult, SystemPromptResult
|
||||||
from crewai.utilities.token_counter_callback import TokenCalcHandler
|
from crewai.utilities.token_counter_callback import TokenCalcHandler
|
||||||
from crewai.utilities.training_handler import CrewTrainingHandler
|
from crewai.utilities.training_handler import CrewTrainingHandler
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
|
||||||
|
except ImportError:
|
||||||
|
A2AClientConfig = Any
|
||||||
|
A2AConfig = Any
|
||||||
|
A2AServerConfig = Any
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from crewai_tools import CodeInterpreterTool
|
from crewai_tools import CodeInterpreterTool
|
||||||
|
|
||||||
@@ -218,9 +225,18 @@ class Agent(BaseAgent):
|
|||||||
guardrail_max_retries: int = Field(
|
guardrail_max_retries: int = Field(
|
||||||
default=3, description="Maximum number of retries when guardrail fails"
|
default=3, description="Maximum number of retries when guardrail fails"
|
||||||
)
|
)
|
||||||
a2a: list[A2AConfig] | A2AConfig | None = Field(
|
a2a: (
|
||||||
|
list[A2AConfig | A2AServerConfig | A2AClientConfig]
|
||||||
|
| A2AConfig
|
||||||
|
| A2AServerConfig
|
||||||
|
| A2AClientConfig
|
||||||
|
| None
|
||||||
|
) = Field(
|
||||||
default=None,
|
default=None,
|
||||||
description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.",
|
description="""
|
||||||
|
A2A (Agent-to-Agent) configuration for delegating tasks to remote agents.
|
||||||
|
Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig.
|
||||||
|
""",
|
||||||
)
|
)
|
||||||
executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field(
|
executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field(
|
||||||
default=CrewAgentExecutor,
|
default=CrewAgentExecutor,
|
||||||
@@ -733,7 +749,7 @@ class Agent(BaseAgent):
|
|||||||
if self.agent_executor is not None:
|
if self.agent_executor is not None:
|
||||||
self._update_executor_parameters(
|
self._update_executor_parameters(
|
||||||
task=task,
|
task=task,
|
||||||
tools=parsed_tools,
|
tools=parsed_tools, # type: ignore[arg-type]
|
||||||
raw_tools=raw_tools,
|
raw_tools=raw_tools,
|
||||||
prompt=prompt,
|
prompt=prompt,
|
||||||
stop_words=stop_words,
|
stop_words=stop_words,
|
||||||
@@ -742,7 +758,7 @@ class Agent(BaseAgent):
|
|||||||
else:
|
else:
|
||||||
self.agent_executor = self.executor_class(
|
self.agent_executor = self.executor_class(
|
||||||
llm=cast(BaseLLM, self.llm),
|
llm=cast(BaseLLM, self.llm),
|
||||||
task=task,
|
task=task, # type: ignore[arg-type]
|
||||||
i18n=self.i18n,
|
i18n=self.i18n,
|
||||||
agent=self,
|
agent=self,
|
||||||
crew=self.crew,
|
crew=self.crew,
|
||||||
@@ -765,11 +781,11 @@ class Agent(BaseAgent):
|
|||||||
def _update_executor_parameters(
|
def _update_executor_parameters(
|
||||||
self,
|
self,
|
||||||
task: Task | None,
|
task: Task | None,
|
||||||
tools: list,
|
tools: list[BaseTool],
|
||||||
raw_tools: list[BaseTool],
|
raw_tools: list[BaseTool],
|
||||||
prompt: dict,
|
prompt: SystemPromptResult | StandardPromptResult,
|
||||||
stop_words: list[str],
|
stop_words: list[str],
|
||||||
rpm_limit_fn: Callable | None,
|
rpm_limit_fn: Callable | None, # type: ignore[type-arg]
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Update executor parameters without recreating instance.
|
"""Update executor parameters without recreating instance.
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
|||||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||||
requires-python = ">=3.10,<3.14"
|
requires-python = ">=3.10,<3.14"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"crewai[tools]==1.8.0"
|
"crewai[tools]==1.8.1"
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
|||||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||||
requires-python = ">=3.10,<3.14"
|
requires-python = ">=3.10,<3.14"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"crewai[tools]==1.8.0"
|
"crewai[tools]==1.8.1"
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
|
|||||||
@@ -209,10 +209,9 @@ class EventListener(BaseEventListener):
|
|||||||
@crewai_event_bus.on(TaskCompletedEvent)
|
@crewai_event_bus.on(TaskCompletedEvent)
|
||||||
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
|
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
|
||||||
# Handle telemetry
|
# Handle telemetry
|
||||||
span = self.execution_spans.get(source)
|
span = self.execution_spans.pop(source, None)
|
||||||
if span:
|
if span:
|
||||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||||
self.execution_spans[source] = None
|
|
||||||
|
|
||||||
# Pass task name if it exists
|
# Pass task name if it exists
|
||||||
task_name = get_task_name(source)
|
task_name = get_task_name(source)
|
||||||
@@ -222,11 +221,10 @@ class EventListener(BaseEventListener):
|
|||||||
|
|
||||||
@crewai_event_bus.on(TaskFailedEvent)
|
@crewai_event_bus.on(TaskFailedEvent)
|
||||||
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
|
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
|
||||||
span = self.execution_spans.get(source)
|
span = self.execution_spans.pop(source, None)
|
||||||
if span:
|
if span:
|
||||||
if source.agent and source.agent.crew:
|
if source.agent and source.agent.crew:
|
||||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||||
self.execution_spans[source] = None
|
|
||||||
|
|
||||||
# Pass task name if it exists
|
# Pass task name if it exists
|
||||||
task_name = get_task_name(source)
|
task_name = get_task_name(source)
|
||||||
|
|||||||
@@ -1,3 +1,20 @@
|
|||||||
|
from crewai.events.types.a2a_events import (
|
||||||
|
A2AConversationCompletedEvent,
|
||||||
|
A2AConversationStartedEvent,
|
||||||
|
A2ADelegationCompletedEvent,
|
||||||
|
A2ADelegationStartedEvent,
|
||||||
|
A2AMessageSentEvent,
|
||||||
|
A2APollingStartedEvent,
|
||||||
|
A2APollingStatusEvent,
|
||||||
|
A2APushNotificationReceivedEvent,
|
||||||
|
A2APushNotificationRegisteredEvent,
|
||||||
|
A2APushNotificationTimeoutEvent,
|
||||||
|
A2AResponseReceivedEvent,
|
||||||
|
A2AServerTaskCanceledEvent,
|
||||||
|
A2AServerTaskCompletedEvent,
|
||||||
|
A2AServerTaskFailedEvent,
|
||||||
|
A2AServerTaskStartedEvent,
|
||||||
|
)
|
||||||
from crewai.events.types.agent_events import (
|
from crewai.events.types.agent_events import (
|
||||||
AgentExecutionCompletedEvent,
|
AgentExecutionCompletedEvent,
|
||||||
AgentExecutionErrorEvent,
|
AgentExecutionErrorEvent,
|
||||||
@@ -76,7 +93,22 @@ from crewai.events.types.tool_usage_events import (
|
|||||||
|
|
||||||
|
|
||||||
EventTypes = (
|
EventTypes = (
|
||||||
CrewKickoffStartedEvent
|
A2AConversationCompletedEvent
|
||||||
|
| A2AConversationStartedEvent
|
||||||
|
| A2ADelegationCompletedEvent
|
||||||
|
| A2ADelegationStartedEvent
|
||||||
|
| A2AMessageSentEvent
|
||||||
|
| A2APollingStartedEvent
|
||||||
|
| A2APollingStatusEvent
|
||||||
|
| A2APushNotificationReceivedEvent
|
||||||
|
| A2APushNotificationRegisteredEvent
|
||||||
|
| A2APushNotificationTimeoutEvent
|
||||||
|
| A2AResponseReceivedEvent
|
||||||
|
| A2AServerTaskCanceledEvent
|
||||||
|
| A2AServerTaskCompletedEvent
|
||||||
|
| A2AServerTaskFailedEvent
|
||||||
|
| A2AServerTaskStartedEvent
|
||||||
|
| CrewKickoffStartedEvent
|
||||||
| CrewKickoffCompletedEvent
|
| CrewKickoffCompletedEvent
|
||||||
| CrewKickoffFailedEvent
|
| CrewKickoffFailedEvent
|
||||||
| CrewTestStartedEvent
|
| CrewTestStartedEvent
|
||||||
|
|||||||
@@ -210,3 +210,37 @@ class A2APushNotificationTimeoutEvent(A2AEventBase):
|
|||||||
type: str = "a2a_push_notification_timeout"
|
type: str = "a2a_push_notification_timeout"
|
||||||
task_id: str
|
task_id: str
|
||||||
timeout_seconds: float
|
timeout_seconds: float
|
||||||
|
|
||||||
|
|
||||||
|
class A2AServerTaskStartedEvent(A2AEventBase):
|
||||||
|
"""Event emitted when an A2A server task execution starts."""
|
||||||
|
|
||||||
|
type: str = "a2a_server_task_started"
|
||||||
|
a2a_task_id: str
|
||||||
|
a2a_context_id: str
|
||||||
|
|
||||||
|
|
||||||
|
class A2AServerTaskCompletedEvent(A2AEventBase):
|
||||||
|
"""Event emitted when an A2A server task execution completes."""
|
||||||
|
|
||||||
|
type: str = "a2a_server_task_completed"
|
||||||
|
a2a_task_id: str
|
||||||
|
a2a_context_id: str
|
||||||
|
result: str
|
||||||
|
|
||||||
|
|
||||||
|
class A2AServerTaskCanceledEvent(A2AEventBase):
|
||||||
|
"""Event emitted when an A2A server task execution is canceled."""
|
||||||
|
|
||||||
|
type: str = "a2a_server_task_canceled"
|
||||||
|
a2a_task_id: str
|
||||||
|
a2a_context_id: str
|
||||||
|
|
||||||
|
|
||||||
|
class A2AServerTaskFailedEvent(A2AEventBase):
|
||||||
|
"""Event emitted when an A2A server task execution fails."""
|
||||||
|
|
||||||
|
type: str = "a2a_server_task_failed"
|
||||||
|
a2a_task_id: str
|
||||||
|
a2a_context_id: str
|
||||||
|
error: str
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import inspect
|
import inspect
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from pydantic import BaseModel, Field, InstanceOf, model_validator
|
from pydantic import BaseModel, Field, InstanceOf, model_validator
|
||||||
from typing_extensions import Self
|
from typing_extensions import Self
|
||||||
@@ -14,14 +15,14 @@ class FlowTrackable(BaseModel):
|
|||||||
inspecting the call stack.
|
inspecting the call stack.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
parent_flow: InstanceOf[Flow] | None = Field(
|
parent_flow: InstanceOf[Flow[Any]] | None = Field(
|
||||||
default=None,
|
default=None,
|
||||||
description="The parent flow of the instance, if it was created inside a flow.",
|
description="The parent flow of the instance, if it was created inside a flow.",
|
||||||
)
|
)
|
||||||
|
|
||||||
@model_validator(mode="after")
|
@model_validator(mode="after")
|
||||||
def _set_parent_flow(self) -> Self:
|
def _set_parent_flow(self) -> Self:
|
||||||
max_depth = 5
|
max_depth = 8
|
||||||
frame = inspect.currentframe()
|
frame = inspect.currentframe()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -443,7 +443,7 @@ class AzureCompletion(BaseLLM):
|
|||||||
params["presence_penalty"] = self.presence_penalty
|
params["presence_penalty"] = self.presence_penalty
|
||||||
if self.max_tokens is not None:
|
if self.max_tokens is not None:
|
||||||
params["max_tokens"] = self.max_tokens
|
params["max_tokens"] = self.max_tokens
|
||||||
if self.stop:
|
if self.stop and self.supports_stop_words():
|
||||||
params["stop"] = self.stop
|
params["stop"] = self.stop
|
||||||
|
|
||||||
# Handle tools/functions for Azure OpenAI models
|
# Handle tools/functions for Azure OpenAI models
|
||||||
@@ -931,8 +931,28 @@ class AzureCompletion(BaseLLM):
|
|||||||
return self.is_openai_model
|
return self.is_openai_model
|
||||||
|
|
||||||
def supports_stop_words(self) -> bool:
|
def supports_stop_words(self) -> bool:
|
||||||
"""Check if the model supports stop words."""
|
"""Check if the model supports stop words.
|
||||||
return True # Most Azure models support stop sequences
|
|
||||||
|
Models using the Responses API (GPT-5 family, o-series reasoning models,
|
||||||
|
computer-use-preview) do not support stop sequences.
|
||||||
|
See: https://learn.microsoft.com/en-us/azure/ai-foundry/foundry-models/concepts/models-sold-directly-by-azure
|
||||||
|
"""
|
||||||
|
model_lower = self.model.lower() if self.model else ""
|
||||||
|
|
||||||
|
if "gpt-5" in model_lower:
|
||||||
|
return False
|
||||||
|
|
||||||
|
o_series_models = ["o1", "o3", "o4", "o1-mini", "o3-mini", "o4-mini"]
|
||||||
|
|
||||||
|
responses_api_models = ["computer-use-preview"]
|
||||||
|
|
||||||
|
unsupported_stop_models = o_series_models + responses_api_models
|
||||||
|
|
||||||
|
for unsupported in unsupported_stop_models:
|
||||||
|
if unsupported in model_lower:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
def get_context_window_size(self) -> int:
|
def get_context_window_size(self) -> int:
|
||||||
"""Get the context window size for the model."""
|
"""Get the context window size for the model."""
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
"""Utilities for creating and manipulating types."""
|
"""Utilities for creating and manipulating types."""
|
||||||
|
|
||||||
from typing import Annotated, Final, Literal
|
from typing import Annotated, Final, Literal, cast
|
||||||
|
|
||||||
from typing_extensions import TypeAliasType
|
|
||||||
|
|
||||||
|
|
||||||
_DYNAMIC_LITERAL_ALIAS: Final[Literal["DynamicLiteral"]] = "DynamicLiteral"
|
_DYNAMIC_LITERAL_ALIAS: Final[Literal["DynamicLiteral"]] = "DynamicLiteral"
|
||||||
@@ -20,6 +18,11 @@ def create_literals_from_strings(
|
|||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Literal type for each A2A agent ID
|
Literal type for each A2A agent ID
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If values is empty (Literal requires at least one value)
|
||||||
"""
|
"""
|
||||||
unique_values: tuple[str, ...] = tuple(dict.fromkeys(values))
|
unique_values: tuple[str, ...] = tuple(dict.fromkeys(values))
|
||||||
return Literal.__getitem__(unique_values)
|
if not unique_values:
|
||||||
|
raise ValueError("Cannot create Literal type from empty values")
|
||||||
|
return cast(type, Literal.__getitem__(unique_values))
|
||||||
|
|||||||
@@ -2,11 +2,8 @@ from datetime import datetime
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import pickle
|
import pickle
|
||||||
import tempfile
|
|
||||||
import threading
|
|
||||||
from typing import Any, TypedDict
|
from typing import Any, TypedDict
|
||||||
|
|
||||||
import portalocker
|
|
||||||
from typing_extensions import Unpack
|
from typing_extensions import Unpack
|
||||||
|
|
||||||
|
|
||||||
@@ -126,15 +123,10 @@ class FileHandler:
|
|||||||
|
|
||||||
|
|
||||||
class PickleHandler:
|
class PickleHandler:
|
||||||
"""Thread-safe handler for saving and loading data using pickle.
|
"""Handler for saving and loading data using pickle.
|
||||||
|
|
||||||
This class provides thread-safe file operations using portalocker for
|
|
||||||
cross-process file locking and atomic write operations to prevent
|
|
||||||
data corruption during concurrent access.
|
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
file_path: The path to the pickle file.
|
file_path: The path to the pickle file.
|
||||||
_lock: Threading lock for thread-safe operations within the same process.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, file_name: str) -> None:
|
def __init__(self, file_name: str) -> None:
|
||||||
@@ -149,62 +141,34 @@ class PickleHandler:
|
|||||||
file_name += ".pkl"
|
file_name += ".pkl"
|
||||||
|
|
||||||
self.file_path = os.path.join(os.getcwd(), file_name)
|
self.file_path = os.path.join(os.getcwd(), file_name)
|
||||||
self._lock = threading.Lock()
|
|
||||||
|
|
||||||
def initialize_file(self) -> None:
|
def initialize_file(self) -> None:
|
||||||
"""Initialize the file with an empty dictionary and overwrite any existing data."""
|
"""Initialize the file with an empty dictionary and overwrite any existing data."""
|
||||||
self.save({})
|
self.save({})
|
||||||
|
|
||||||
def save(self, data: Any) -> None:
|
def save(self, data: Any) -> None:
|
||||||
"""Save the data to the specified file using pickle with thread-safe atomic writes.
|
"""
|
||||||
|
Save the data to the specified file using pickle.
|
||||||
This method uses a two-phase approach for thread safety:
|
|
||||||
1. Threading lock for same-process thread safety
|
|
||||||
2. Atomic write (write to temp file, then rename) for cross-process safety
|
|
||||||
and data integrity
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
data: The data to be saved to the file.
|
data: The data to be saved to the file.
|
||||||
"""
|
"""
|
||||||
with self._lock:
|
with open(self.file_path, "wb") as f:
|
||||||
dir_name = os.path.dirname(self.file_path) or os.getcwd()
|
pickle.dump(obj=data, file=f)
|
||||||
fd, temp_path = tempfile.mkstemp(
|
|
||||||
suffix=".pkl.tmp", prefix="pickle_", dir=dir_name
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
with os.fdopen(fd, "wb") as f:
|
|
||||||
pickle.dump(obj=data, file=f)
|
|
||||||
f.flush()
|
|
||||||
os.fsync(f.fileno())
|
|
||||||
os.replace(temp_path, self.file_path)
|
|
||||||
except Exception:
|
|
||||||
if os.path.exists(temp_path):
|
|
||||||
os.unlink(temp_path)
|
|
||||||
raise
|
|
||||||
|
|
||||||
def load(self) -> Any:
|
def load(self) -> Any:
|
||||||
"""Load the data from the specified file using pickle with thread-safe locking.
|
"""Load the data from the specified file using pickle.
|
||||||
|
|
||||||
This method uses portalocker for cross-process read locking to ensure
|
|
||||||
data consistency when multiple processes may be accessing the file.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The data loaded from the file, or an empty dictionary if the file
|
The data loaded from the file.
|
||||||
does not exist or is empty.
|
|
||||||
"""
|
"""
|
||||||
with self._lock:
|
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
|
||||||
if (
|
return {} # Return an empty dictionary if the file does not exist or is empty
|
||||||
not os.path.exists(self.file_path)
|
|
||||||
or os.path.getsize(self.file_path) == 0
|
|
||||||
):
|
|
||||||
return {}
|
|
||||||
|
|
||||||
with portalocker.Lock(
|
with open(self.file_path, "rb") as file:
|
||||||
self.file_path, "rb", flags=portalocker.LOCK_SH
|
try:
|
||||||
) as file:
|
return pickle.load(file) # noqa: S301
|
||||||
try:
|
except EOFError:
|
||||||
return pickle.load(file) # noqa: S301
|
return {} # Return an empty dictionary if the file is empty or corrupted
|
||||||
except EOFError:
|
except Exception:
|
||||||
return {}
|
raise # Raise any other exceptions that occur during loading
|
||||||
except Exception:
|
|
||||||
raise
|
|
||||||
|
|||||||
325
lib/crewai/tests/a2a/utils/test_agent_card.py
Normal file
325
lib/crewai/tests/a2a/utils/test_agent_card.py
Normal file
@@ -0,0 +1,325 @@
|
|||||||
|
"""Tests for A2A agent card utilities."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from a2a.types import AgentCard, AgentSkill
|
||||||
|
|
||||||
|
from crewai import Agent
|
||||||
|
from crewai.a2a.config import A2AClientConfig, A2AServerConfig
|
||||||
|
from crewai.a2a.utils.agent_card import inject_a2a_server_methods
|
||||||
|
|
||||||
|
|
||||||
|
class TestInjectA2AServerMethods:
|
||||||
|
"""Tests for inject_a2a_server_methods function."""
|
||||||
|
|
||||||
|
def test_agent_with_server_config_gets_to_agent_card_method(self) -> None:
|
||||||
|
"""Agent with A2AServerConfig should have to_agent_card method injected."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert hasattr(agent, "to_agent_card")
|
||||||
|
assert callable(agent.to_agent_card)
|
||||||
|
|
||||||
|
def test_agent_without_server_config_no_injection(self) -> None:
|
||||||
|
"""Agent without A2AServerConfig should not get to_agent_card method."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AClientConfig(endpoint="http://example.com"),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert not hasattr(agent, "to_agent_card")
|
||||||
|
|
||||||
|
def test_agent_without_a2a_no_injection(self) -> None:
|
||||||
|
"""Agent without any a2a config should not get to_agent_card method."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert not hasattr(agent, "to_agent_card")
|
||||||
|
|
||||||
|
def test_agent_with_mixed_configs_gets_injection(self) -> None:
|
||||||
|
"""Agent with list containing A2AServerConfig should get to_agent_card."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=[
|
||||||
|
A2AClientConfig(endpoint="http://example.com"),
|
||||||
|
A2AServerConfig(name="My Agent"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert hasattr(agent, "to_agent_card")
|
||||||
|
assert callable(agent.to_agent_card)
|
||||||
|
|
||||||
|
def test_manual_injection_on_plain_agent(self) -> None:
|
||||||
|
"""inject_a2a_server_methods should work when called manually."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
)
|
||||||
|
# Manually set server config and inject
|
||||||
|
object.__setattr__(agent, "a2a", A2AServerConfig())
|
||||||
|
inject_a2a_server_methods(agent)
|
||||||
|
|
||||||
|
assert hasattr(agent, "to_agent_card")
|
||||||
|
assert callable(agent.to_agent_card)
|
||||||
|
|
||||||
|
|
||||||
|
class TestToAgentCard:
|
||||||
|
"""Tests for the injected to_agent_card method."""
|
||||||
|
|
||||||
|
def test_returns_agent_card(self) -> None:
|
||||||
|
"""to_agent_card should return an AgentCard instance."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
|
||||||
|
assert isinstance(card, AgentCard)
|
||||||
|
|
||||||
|
def test_uses_agent_role_as_name(self) -> None:
|
||||||
|
"""AgentCard name should default to agent role."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Data Analyst",
|
||||||
|
goal="Analyze data",
|
||||||
|
backstory="Expert analyst",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
|
||||||
|
assert card.name == "Data Analyst"
|
||||||
|
|
||||||
|
def test_uses_server_config_name(self) -> None:
|
||||||
|
"""AgentCard name should prefer A2AServerConfig.name over role."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Data Analyst",
|
||||||
|
goal="Analyze data",
|
||||||
|
backstory="Expert analyst",
|
||||||
|
a2a=A2AServerConfig(name="Custom Agent Name"),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
|
||||||
|
assert card.name == "Custom Agent Name"
|
||||||
|
|
||||||
|
def test_uses_goal_as_description(self) -> None:
|
||||||
|
"""AgentCard description should include agent goal."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Accomplish important tasks",
|
||||||
|
backstory="Has extensive experience",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
|
||||||
|
assert "Accomplish important tasks" in card.description
|
||||||
|
|
||||||
|
def test_uses_server_config_description(self) -> None:
|
||||||
|
"""AgentCard description should prefer A2AServerConfig.description."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Accomplish important tasks",
|
||||||
|
backstory="Has extensive experience",
|
||||||
|
a2a=A2AServerConfig(description="Custom description"),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
|
||||||
|
assert card.description == "Custom description"
|
||||||
|
|
||||||
|
def test_uses_provided_url(self) -> None:
|
||||||
|
"""AgentCard url should use the provided URL."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://my-server.com:9000")
|
||||||
|
|
||||||
|
assert card.url == "http://my-server.com:9000"
|
||||||
|
|
||||||
|
def test_uses_server_config_url(self) -> None:
|
||||||
|
"""AgentCard url should prefer A2AServerConfig.url over provided URL."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(url="http://configured-url.com"),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://fallback-url.com")
|
||||||
|
|
||||||
|
assert card.url == "http://configured-url.com/"
|
||||||
|
|
||||||
|
def test_generates_default_skill(self) -> None:
|
||||||
|
"""AgentCard should have at least one skill based on agent role."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Research Assistant",
|
||||||
|
goal="Help with research",
|
||||||
|
backstory="Skilled researcher",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
|
||||||
|
assert len(card.skills) >= 1
|
||||||
|
skill = card.skills[0]
|
||||||
|
assert skill.name == "Research Assistant"
|
||||||
|
assert skill.description == "Help with research"
|
||||||
|
|
||||||
|
def test_uses_server_config_skills(self) -> None:
|
||||||
|
"""AgentCard skills should prefer A2AServerConfig.skills."""
|
||||||
|
custom_skill = AgentSkill(
|
||||||
|
id="custom-skill",
|
||||||
|
name="Custom Skill",
|
||||||
|
description="A custom skill",
|
||||||
|
tags=["custom"],
|
||||||
|
)
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(skills=[custom_skill]),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
|
||||||
|
assert len(card.skills) == 1
|
||||||
|
assert card.skills[0].id == "custom-skill"
|
||||||
|
assert card.skills[0].name == "Custom Skill"
|
||||||
|
|
||||||
|
def test_includes_custom_version(self) -> None:
|
||||||
|
"""AgentCard should include version from A2AServerConfig."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(version="2.0.0"),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
|
||||||
|
assert card.version == "2.0.0"
|
||||||
|
|
||||||
|
def test_default_version(self) -> None:
|
||||||
|
"""AgentCard should have default version 1.0.0."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
|
||||||
|
assert card.version == "1.0.0"
|
||||||
|
|
||||||
|
|
||||||
|
class TestAgentCardJsonStructure:
|
||||||
|
"""Tests for the JSON structure of AgentCard."""
|
||||||
|
|
||||||
|
def test_json_has_required_fields(self) -> None:
|
||||||
|
"""AgentCard JSON should contain all required A2A protocol fields."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
json_data = card.model_dump()
|
||||||
|
|
||||||
|
assert "name" in json_data
|
||||||
|
assert "description" in json_data
|
||||||
|
assert "url" in json_data
|
||||||
|
assert "version" in json_data
|
||||||
|
assert "skills" in json_data
|
||||||
|
assert "capabilities" in json_data
|
||||||
|
assert "defaultInputModes" in json_data
|
||||||
|
assert "defaultOutputModes" in json_data
|
||||||
|
|
||||||
|
def test_json_skills_structure(self) -> None:
|
||||||
|
"""Each skill in JSON should have required fields."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
json_data = card.model_dump()
|
||||||
|
|
||||||
|
assert len(json_data["skills"]) >= 1
|
||||||
|
skill = json_data["skills"][0]
|
||||||
|
assert "id" in skill
|
||||||
|
assert "name" in skill
|
||||||
|
assert "description" in skill
|
||||||
|
assert "tags" in skill
|
||||||
|
|
||||||
|
def test_json_capabilities_structure(self) -> None:
|
||||||
|
"""Capabilities in JSON should have expected fields."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
json_data = card.model_dump()
|
||||||
|
|
||||||
|
capabilities = json_data["capabilities"]
|
||||||
|
assert "streaming" in capabilities
|
||||||
|
assert "pushNotifications" in capabilities
|
||||||
|
|
||||||
|
def test_json_serializable(self) -> None:
|
||||||
|
"""AgentCard should be JSON serializable."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
json_str = card.model_dump_json()
|
||||||
|
|
||||||
|
assert isinstance(json_str, str)
|
||||||
|
assert "Test Agent" in json_str
|
||||||
|
assert "http://localhost:8000" in json_str
|
||||||
|
|
||||||
|
def test_json_excludes_none_values(self) -> None:
|
||||||
|
"""AgentCard JSON with exclude_none should omit None fields."""
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test goal",
|
||||||
|
backstory="Test backstory",
|
||||||
|
a2a=A2AServerConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
card = agent.to_agent_card("http://localhost:8000")
|
||||||
|
json_data = card.model_dump(exclude_none=True)
|
||||||
|
|
||||||
|
assert "provider" not in json_data
|
||||||
|
assert "documentationUrl" not in json_data
|
||||||
|
assert "iconUrl" not in json_data
|
||||||
370
lib/crewai/tests/a2a/utils/test_task.py
Normal file
370
lib/crewai/tests/a2a/utils/test_task.py
Normal file
@@ -0,0 +1,370 @@
|
|||||||
|
"""Tests for A2A task utilities."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from typing import Any
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
from a2a.server.agent_execution import RequestContext
|
||||||
|
from a2a.server.events import EventQueue
|
||||||
|
from a2a.types import Message, Task as A2ATask, TaskState, TaskStatus
|
||||||
|
|
||||||
|
from crewai.a2a.utils.task import cancel, cancellable, execute
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_agent() -> MagicMock:
|
||||||
|
"""Create a mock CrewAI agent."""
|
||||||
|
agent = MagicMock()
|
||||||
|
agent.role = "Test Agent"
|
||||||
|
agent.tools = []
|
||||||
|
agent.aexecute_task = AsyncMock(return_value="Task completed successfully")
|
||||||
|
return agent
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_task() -> MagicMock:
|
||||||
|
"""Create a mock Task."""
|
||||||
|
return MagicMock()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_context() -> MagicMock:
|
||||||
|
"""Create a mock RequestContext."""
|
||||||
|
context = MagicMock(spec=RequestContext)
|
||||||
|
context.task_id = "test-task-123"
|
||||||
|
context.context_id = "test-context-456"
|
||||||
|
context.get_user_input.return_value = "Test user message"
|
||||||
|
context.message = MagicMock(spec=Message)
|
||||||
|
context.current_task = None
|
||||||
|
return context
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_event_queue() -> AsyncMock:
|
||||||
|
"""Create a mock EventQueue."""
|
||||||
|
queue = AsyncMock(spec=EventQueue)
|
||||||
|
queue.enqueue_event = AsyncMock()
|
||||||
|
return queue
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture(autouse=True)
|
||||||
|
async def clear_cache(mock_context: MagicMock) -> None:
|
||||||
|
"""Clear cancel flag from cache before each test."""
|
||||||
|
from aiocache import caches
|
||||||
|
|
||||||
|
cache = caches.get("default")
|
||||||
|
await cache.delete(f"cancel:{mock_context.task_id}")
|
||||||
|
|
||||||
|
|
||||||
|
class TestCancellableDecorator:
|
||||||
|
"""Tests for the cancellable decorator."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_executes_function_without_context(self) -> None:
|
||||||
|
"""Function executes normally when no RequestContext is provided."""
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
@cancellable
|
||||||
|
async def my_func(value: int) -> int:
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
return value * 2
|
||||||
|
|
||||||
|
result = await my_func(5)
|
||||||
|
|
||||||
|
assert result == 10
|
||||||
|
assert call_count == 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_executes_function_with_context(self, mock_context: MagicMock) -> None:
|
||||||
|
"""Function executes normally with RequestContext when not cancelled."""
|
||||||
|
@cancellable
|
||||||
|
async def my_func(context: RequestContext) -> str:
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
return "completed"
|
||||||
|
|
||||||
|
result = await my_func(mock_context)
|
||||||
|
|
||||||
|
assert result == "completed"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cancellation_raises_cancelled_error(
|
||||||
|
self, mock_context: MagicMock
|
||||||
|
) -> None:
|
||||||
|
"""Function raises CancelledError when cancel flag is set."""
|
||||||
|
from aiocache import caches
|
||||||
|
|
||||||
|
cache = caches.get("default")
|
||||||
|
|
||||||
|
@cancellable
|
||||||
|
async def slow_func(context: RequestContext) -> str:
|
||||||
|
await asyncio.sleep(1.0)
|
||||||
|
return "should not reach"
|
||||||
|
|
||||||
|
await cache.set(f"cancel:{mock_context.task_id}", True)
|
||||||
|
|
||||||
|
with pytest.raises(asyncio.CancelledError):
|
||||||
|
await slow_func(mock_context)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cleanup_removes_cancel_flag(self, mock_context: MagicMock) -> None:
|
||||||
|
"""Cancel flag is cleaned up after execution."""
|
||||||
|
from aiocache import caches
|
||||||
|
|
||||||
|
cache = caches.get("default")
|
||||||
|
|
||||||
|
@cancellable
|
||||||
|
async def quick_func(context: RequestContext) -> str:
|
||||||
|
return "done"
|
||||||
|
|
||||||
|
await quick_func(mock_context)
|
||||||
|
|
||||||
|
flag = await cache.get(f"cancel:{mock_context.task_id}")
|
||||||
|
assert flag is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_extracts_context_from_kwargs(self, mock_context: MagicMock) -> None:
|
||||||
|
"""Context can be passed as keyword argument."""
|
||||||
|
@cancellable
|
||||||
|
async def my_func(value: int, context: RequestContext | None = None) -> int:
|
||||||
|
return value + 1
|
||||||
|
|
||||||
|
result = await my_func(10, context=mock_context)
|
||||||
|
|
||||||
|
assert result == 11
|
||||||
|
|
||||||
|
|
||||||
|
class TestExecute:
|
||||||
|
"""Tests for the execute function."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_successful_execution(
|
||||||
|
self,
|
||||||
|
mock_agent: MagicMock,
|
||||||
|
mock_context: MagicMock,
|
||||||
|
mock_event_queue: AsyncMock,
|
||||||
|
mock_task: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Execute completes successfully and enqueues completed task."""
|
||||||
|
with (
|
||||||
|
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
|
||||||
|
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
|
||||||
|
):
|
||||||
|
await execute(mock_agent, mock_context, mock_event_queue)
|
||||||
|
|
||||||
|
mock_agent.aexecute_task.assert_called_once()
|
||||||
|
mock_event_queue.enqueue_event.assert_called_once()
|
||||||
|
assert mock_bus.emit.call_count == 2
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_emits_started_event(
|
||||||
|
self,
|
||||||
|
mock_agent: MagicMock,
|
||||||
|
mock_context: MagicMock,
|
||||||
|
mock_event_queue: AsyncMock,
|
||||||
|
mock_task: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Execute emits A2AServerTaskStartedEvent."""
|
||||||
|
with (
|
||||||
|
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
|
||||||
|
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
|
||||||
|
):
|
||||||
|
await execute(mock_agent, mock_context, mock_event_queue)
|
||||||
|
|
||||||
|
first_call = mock_bus.emit.call_args_list[0]
|
||||||
|
event = first_call[0][1]
|
||||||
|
|
||||||
|
assert event.type == "a2a_server_task_started"
|
||||||
|
assert event.a2a_task_id == mock_context.task_id
|
||||||
|
assert event.a2a_context_id == mock_context.context_id
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_emits_completed_event(
|
||||||
|
self,
|
||||||
|
mock_agent: MagicMock,
|
||||||
|
mock_context: MagicMock,
|
||||||
|
mock_event_queue: AsyncMock,
|
||||||
|
mock_task: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Execute emits A2AServerTaskCompletedEvent on success."""
|
||||||
|
with (
|
||||||
|
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
|
||||||
|
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
|
||||||
|
):
|
||||||
|
await execute(mock_agent, mock_context, mock_event_queue)
|
||||||
|
|
||||||
|
second_call = mock_bus.emit.call_args_list[1]
|
||||||
|
event = second_call[0][1]
|
||||||
|
|
||||||
|
assert event.type == "a2a_server_task_completed"
|
||||||
|
assert event.a2a_task_id == mock_context.task_id
|
||||||
|
assert event.result == "Task completed successfully"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_emits_failed_event_on_exception(
|
||||||
|
self,
|
||||||
|
mock_agent: MagicMock,
|
||||||
|
mock_context: MagicMock,
|
||||||
|
mock_event_queue: AsyncMock,
|
||||||
|
mock_task: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Execute emits A2AServerTaskFailedEvent on exception."""
|
||||||
|
mock_agent.aexecute_task = AsyncMock(side_effect=ValueError("Test error"))
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
|
||||||
|
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
|
||||||
|
):
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
await execute(mock_agent, mock_context, mock_event_queue)
|
||||||
|
|
||||||
|
failed_call = mock_bus.emit.call_args_list[1]
|
||||||
|
event = failed_call[0][1]
|
||||||
|
|
||||||
|
assert event.type == "a2a_server_task_failed"
|
||||||
|
assert "Test error" in event.error
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_emits_canceled_event_on_cancellation(
|
||||||
|
self,
|
||||||
|
mock_agent: MagicMock,
|
||||||
|
mock_context: MagicMock,
|
||||||
|
mock_event_queue: AsyncMock,
|
||||||
|
mock_task: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Execute emits A2AServerTaskCanceledEvent on CancelledError."""
|
||||||
|
mock_agent.aexecute_task = AsyncMock(side_effect=asyncio.CancelledError())
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
|
||||||
|
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
|
||||||
|
):
|
||||||
|
with pytest.raises(asyncio.CancelledError):
|
||||||
|
await execute(mock_agent, mock_context, mock_event_queue)
|
||||||
|
|
||||||
|
canceled_call = mock_bus.emit.call_args_list[1]
|
||||||
|
event = canceled_call[0][1]
|
||||||
|
|
||||||
|
assert event.type == "a2a_server_task_canceled"
|
||||||
|
assert event.a2a_task_id == mock_context.task_id
|
||||||
|
|
||||||
|
|
||||||
|
class TestCancel:
|
||||||
|
"""Tests for the cancel function."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_sets_cancel_flag_in_cache(
|
||||||
|
self,
|
||||||
|
mock_context: MagicMock,
|
||||||
|
mock_event_queue: AsyncMock,
|
||||||
|
) -> None:
|
||||||
|
"""Cancel sets the cancel flag in cache."""
|
||||||
|
from aiocache import caches
|
||||||
|
|
||||||
|
cache = caches.get("default")
|
||||||
|
|
||||||
|
await cancel(mock_context, mock_event_queue)
|
||||||
|
|
||||||
|
flag = await cache.get(f"cancel:{mock_context.task_id}")
|
||||||
|
assert flag is True
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_enqueues_task_status_update_event(
|
||||||
|
self,
|
||||||
|
mock_context: MagicMock,
|
||||||
|
mock_event_queue: AsyncMock,
|
||||||
|
) -> None:
|
||||||
|
"""Cancel enqueues TaskStatusUpdateEvent with canceled state."""
|
||||||
|
await cancel(mock_context, mock_event_queue)
|
||||||
|
|
||||||
|
mock_event_queue.enqueue_event.assert_called_once()
|
||||||
|
event = mock_event_queue.enqueue_event.call_args[0][0]
|
||||||
|
|
||||||
|
assert event.task_id == mock_context.task_id
|
||||||
|
assert event.context_id == mock_context.context_id
|
||||||
|
assert event.status.state == TaskState.canceled
|
||||||
|
assert event.final is True
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_returns_none_when_no_current_task(
|
||||||
|
self,
|
||||||
|
mock_context: MagicMock,
|
||||||
|
mock_event_queue: AsyncMock,
|
||||||
|
) -> None:
|
||||||
|
"""Cancel returns None when context has no current_task."""
|
||||||
|
mock_context.current_task = None
|
||||||
|
|
||||||
|
result = await cancel(mock_context, mock_event_queue)
|
||||||
|
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_returns_updated_task_when_current_task_exists(
|
||||||
|
self,
|
||||||
|
mock_context: MagicMock,
|
||||||
|
mock_event_queue: AsyncMock,
|
||||||
|
) -> None:
|
||||||
|
"""Cancel returns updated task when context has current_task."""
|
||||||
|
current_task = MagicMock(spec=A2ATask)
|
||||||
|
current_task.status = TaskStatus(state=TaskState.working)
|
||||||
|
mock_context.current_task = current_task
|
||||||
|
|
||||||
|
result = await cancel(mock_context, mock_event_queue)
|
||||||
|
|
||||||
|
assert result is current_task
|
||||||
|
assert result.status.state == TaskState.canceled
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cleanup_after_cancel(
|
||||||
|
self,
|
||||||
|
mock_context: MagicMock,
|
||||||
|
mock_event_queue: AsyncMock,
|
||||||
|
) -> None:
|
||||||
|
"""Cancel flag persists for cancellable decorator to detect."""
|
||||||
|
from aiocache import caches
|
||||||
|
|
||||||
|
cache = caches.get("default")
|
||||||
|
|
||||||
|
await cancel(mock_context, mock_event_queue)
|
||||||
|
|
||||||
|
flag = await cache.get(f"cancel:{mock_context.task_id}")
|
||||||
|
assert flag is True
|
||||||
|
|
||||||
|
await cache.delete(f"cancel:{mock_context.task_id}")
|
||||||
|
|
||||||
|
|
||||||
|
class TestExecuteAndCancelIntegration:
|
||||||
|
"""Integration tests for execute and cancel working together."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cancel_stops_running_execute(
|
||||||
|
self,
|
||||||
|
mock_agent: MagicMock,
|
||||||
|
mock_context: MagicMock,
|
||||||
|
mock_event_queue: AsyncMock,
|
||||||
|
mock_task: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Calling cancel stops a running execute."""
|
||||||
|
async def slow_task(**kwargs: Any) -> str:
|
||||||
|
await asyncio.sleep(2.0)
|
||||||
|
return "should not complete"
|
||||||
|
|
||||||
|
mock_agent.aexecute_task = slow_task
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
|
||||||
|
patch("crewai.a2a.utils.task.crewai_event_bus"),
|
||||||
|
):
|
||||||
|
execute_task = asyncio.create_task(
|
||||||
|
execute(mock_agent, mock_context, mock_event_queue)
|
||||||
|
)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
await cancel(mock_context, mock_event_queue)
|
||||||
|
|
||||||
|
with pytest.raises(asyncio.CancelledError):
|
||||||
|
await execute_task
|
||||||
@@ -515,6 +515,94 @@ def test_azure_supports_stop_words():
|
|||||||
assert llm.supports_stop_words() == True
|
assert llm.supports_stop_words() == True
|
||||||
|
|
||||||
|
|
||||||
|
def test_azure_gpt5_models_do_not_support_stop_words():
|
||||||
|
"""
|
||||||
|
Test that GPT-5 family models do not support stop words.
|
||||||
|
GPT-5 models use the Responses API which doesn't support stop sequences.
|
||||||
|
See: https://learn.microsoft.com/en-us/azure/ai-foundry/foundry-models/concepts/models-sold-directly-by-azure
|
||||||
|
"""
|
||||||
|
# GPT-5 base models
|
||||||
|
gpt5_models = [
|
||||||
|
"azure/gpt-5",
|
||||||
|
"azure/gpt-5-mini",
|
||||||
|
"azure/gpt-5-nano",
|
||||||
|
"azure/gpt-5-chat",
|
||||||
|
# GPT-5.1 series
|
||||||
|
"azure/gpt-5.1",
|
||||||
|
"azure/gpt-5.1-chat",
|
||||||
|
"azure/gpt-5.1-codex",
|
||||||
|
"azure/gpt-5.1-codex-mini",
|
||||||
|
# GPT-5.2 series
|
||||||
|
"azure/gpt-5.2",
|
||||||
|
"azure/gpt-5.2-chat",
|
||||||
|
]
|
||||||
|
|
||||||
|
for model_name in gpt5_models:
|
||||||
|
llm = LLM(model=model_name)
|
||||||
|
assert llm.supports_stop_words() == False, f"Expected {model_name} to NOT support stop words"
|
||||||
|
|
||||||
|
|
||||||
|
def test_azure_o_series_models_do_not_support_stop_words():
|
||||||
|
"""
|
||||||
|
Test that o-series reasoning models do not support stop words.
|
||||||
|
"""
|
||||||
|
o_series_models = [
|
||||||
|
"azure/o1",
|
||||||
|
"azure/o1-mini",
|
||||||
|
"azure/o3",
|
||||||
|
"azure/o3-mini",
|
||||||
|
"azure/o4",
|
||||||
|
"azure/o4-mini",
|
||||||
|
]
|
||||||
|
|
||||||
|
for model_name in o_series_models:
|
||||||
|
llm = LLM(model=model_name)
|
||||||
|
assert llm.supports_stop_words() == False, f"Expected {model_name} to NOT support stop words"
|
||||||
|
|
||||||
|
|
||||||
|
def test_azure_responses_api_models_do_not_support_stop_words():
|
||||||
|
"""
|
||||||
|
Test that models using the Responses API do not support stop words.
|
||||||
|
"""
|
||||||
|
responses_api_models = [
|
||||||
|
"azure/computer-use-preview",
|
||||||
|
]
|
||||||
|
|
||||||
|
for model_name in responses_api_models:
|
||||||
|
llm = LLM(model=model_name)
|
||||||
|
assert llm.supports_stop_words() == False, f"Expected {model_name} to NOT support stop words"
|
||||||
|
|
||||||
|
|
||||||
|
def test_azure_stop_words_not_included_for_unsupported_models():
|
||||||
|
"""
|
||||||
|
Test that stop words are not included in completion params for models that don't support them.
|
||||||
|
"""
|
||||||
|
with patch.dict(os.environ, {
|
||||||
|
"AZURE_API_KEY": "test-key",
|
||||||
|
"AZURE_ENDPOINT": "https://models.inference.ai.azure.com"
|
||||||
|
}):
|
||||||
|
# Test GPT-5 model - stop should NOT be included even if set
|
||||||
|
llm_gpt5 = LLM(
|
||||||
|
model="azure/gpt-5-nano",
|
||||||
|
stop=["STOP", "END"]
|
||||||
|
)
|
||||||
|
params = llm_gpt5._prepare_completion_params(
|
||||||
|
messages=[{"role": "user", "content": "test"}]
|
||||||
|
)
|
||||||
|
assert "stop" not in params, "stop should not be included for GPT-5 models"
|
||||||
|
|
||||||
|
# Test regular model - stop SHOULD be included
|
||||||
|
llm_gpt4 = LLM(
|
||||||
|
model="azure/gpt-4",
|
||||||
|
stop=["STOP", "END"]
|
||||||
|
)
|
||||||
|
params = llm_gpt4._prepare_completion_params(
|
||||||
|
messages=[{"role": "user", "content": "test"}]
|
||||||
|
)
|
||||||
|
assert "stop" in params, "stop should be included for GPT-4 models"
|
||||||
|
assert params["stop"] == ["STOP", "END"]
|
||||||
|
|
||||||
|
|
||||||
def test_azure_context_window_size():
|
def test_azure_context_window_size():
|
||||||
"""
|
"""
|
||||||
Test that Azure models return correct context window sizes
|
Test that Azure models return correct context window sizes
|
||||||
|
|||||||
@@ -4500,6 +4500,71 @@ def test_crew_copy_with_memory():
|
|||||||
pytest.fail(f"Copying crew raised an unexpected exception: {e}")
|
pytest.fail(f"Copying crew raised an unexpected exception: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow():
|
||||||
|
@CrewBase
|
||||||
|
class TestCrew:
|
||||||
|
agents_config = None
|
||||||
|
tasks_config = None
|
||||||
|
|
||||||
|
agents: list[BaseAgent]
|
||||||
|
tasks: list[Task]
|
||||||
|
|
||||||
|
@agent
|
||||||
|
def researcher(self) -> Agent:
|
||||||
|
return Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Research things",
|
||||||
|
backstory="Expert researcher",
|
||||||
|
)
|
||||||
|
|
||||||
|
@agent
|
||||||
|
def writer_agent(self) -> Agent:
|
||||||
|
return Agent(
|
||||||
|
role="Writer",
|
||||||
|
goal="Write things",
|
||||||
|
backstory="Expert writer",
|
||||||
|
)
|
||||||
|
|
||||||
|
@task
|
||||||
|
def research_task(self) -> Task:
|
||||||
|
return Task(
|
||||||
|
description="Test task for researcher",
|
||||||
|
expected_output="output",
|
||||||
|
agent=self.researcher(),
|
||||||
|
)
|
||||||
|
|
||||||
|
@task
|
||||||
|
def write_task(self) -> Task:
|
||||||
|
return Task(
|
||||||
|
description="Test task for writer",
|
||||||
|
expected_output="output",
|
||||||
|
agent=self.writer_agent(),
|
||||||
|
)
|
||||||
|
|
||||||
|
@crew
|
||||||
|
def crew(self) -> Crew:
|
||||||
|
return Crew(
|
||||||
|
agents=self.agents,
|
||||||
|
tasks=self.tasks,
|
||||||
|
process=Process.sequential,
|
||||||
|
)
|
||||||
|
|
||||||
|
captured_crew = None
|
||||||
|
|
||||||
|
class MyFlow(Flow):
|
||||||
|
@start()
|
||||||
|
def start_method(self):
|
||||||
|
nonlocal captured_crew
|
||||||
|
captured_crew = TestCrew().crew()
|
||||||
|
return captured_crew
|
||||||
|
|
||||||
|
flow = MyFlow()
|
||||||
|
flow.kickoff()
|
||||||
|
|
||||||
|
assert captured_crew is not None
|
||||||
|
assert captured_crew.parent_flow is flow
|
||||||
|
|
||||||
|
|
||||||
def test_sets_parent_flow_when_outside_flow(researcher, writer):
|
def test_sets_parent_flow_when_outside_flow(researcher, writer):
|
||||||
crew = Crew(
|
crew = Crew(
|
||||||
agents=[researcher, writer],
|
agents=[researcher, writer],
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
import threading
|
|
||||||
import unittest
|
import unittest
|
||||||
import uuid
|
import uuid
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from crewai.utilities.file_handler import PickleHandler
|
from crewai.utilities.file_handler import PickleHandler
|
||||||
@@ -10,6 +8,7 @@ from crewai.utilities.file_handler import PickleHandler
|
|||||||
|
|
||||||
class TestPickleHandler(unittest.TestCase):
|
class TestPickleHandler(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
# Use a unique file name for each test to avoid race conditions in parallel test execution
|
||||||
unique_id = str(uuid.uuid4())
|
unique_id = str(uuid.uuid4())
|
||||||
self.file_name = f"test_data_{unique_id}.pkl"
|
self.file_name = f"test_data_{unique_id}.pkl"
|
||||||
self.file_path = os.path.join(os.getcwd(), self.file_name)
|
self.file_path = os.path.join(os.getcwd(), self.file_name)
|
||||||
@@ -48,234 +47,3 @@ class TestPickleHandler(unittest.TestCase):
|
|||||||
|
|
||||||
assert str(exc.value) == "pickle data was truncated"
|
assert str(exc.value) == "pickle data was truncated"
|
||||||
assert "<class '_pickle.UnpicklingError'>" == str(exc.type)
|
assert "<class '_pickle.UnpicklingError'>" == str(exc.type)
|
||||||
|
|
||||||
|
|
||||||
class TestPickleHandlerThreadSafety(unittest.TestCase):
|
|
||||||
"""Tests for thread-safety of PickleHandler operations."""
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
unique_id = str(uuid.uuid4())
|
|
||||||
self.file_name = f"test_thread_safe_{unique_id}.pkl"
|
|
||||||
self.file_path = os.path.join(os.getcwd(), self.file_name)
|
|
||||||
self.handler = PickleHandler(self.file_name)
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
if os.path.exists(self.file_path):
|
|
||||||
os.remove(self.file_path)
|
|
||||||
|
|
||||||
def test_concurrent_writes_same_handler(self):
|
|
||||||
"""Test that concurrent writes from multiple threads using the same handler don't corrupt data."""
|
|
||||||
num_threads = 10
|
|
||||||
num_writes_per_thread = 20
|
|
||||||
errors: list[Exception] = []
|
|
||||||
write_count = 0
|
|
||||||
count_lock = threading.Lock()
|
|
||||||
|
|
||||||
def write_data(thread_id: int) -> None:
|
|
||||||
nonlocal write_count
|
|
||||||
for i in range(num_writes_per_thread):
|
|
||||||
try:
|
|
||||||
data = {"thread": thread_id, "iteration": i, "data": f"value_{thread_id}_{i}"}
|
|
||||||
self.handler.save(data)
|
|
||||||
with count_lock:
|
|
||||||
write_count += 1
|
|
||||||
except Exception as e:
|
|
||||||
errors.append(e)
|
|
||||||
|
|
||||||
threads = []
|
|
||||||
for i in range(num_threads):
|
|
||||||
t = threading.Thread(target=write_data, args=(i,))
|
|
||||||
threads.append(t)
|
|
||||||
t.start()
|
|
||||||
|
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
|
|
||||||
assert len(errors) == 0, f"Errors occurred during concurrent writes: {errors}"
|
|
||||||
assert write_count == num_threads * num_writes_per_thread
|
|
||||||
loaded_data = self.handler.load()
|
|
||||||
assert isinstance(loaded_data, dict)
|
|
||||||
assert "thread" in loaded_data
|
|
||||||
assert "iteration" in loaded_data
|
|
||||||
|
|
||||||
def test_concurrent_reads_same_handler(self):
|
|
||||||
"""Test that concurrent reads from multiple threads don't cause issues."""
|
|
||||||
test_data = {"key": "value", "nested": {"a": 1, "b": 2}}
|
|
||||||
self.handler.save(test_data)
|
|
||||||
|
|
||||||
num_threads = 20
|
|
||||||
results: list[dict] = []
|
|
||||||
errors: list[Exception] = []
|
|
||||||
results_lock = threading.Lock()
|
|
||||||
|
|
||||||
def read_data() -> None:
|
|
||||||
try:
|
|
||||||
data = self.handler.load()
|
|
||||||
with results_lock:
|
|
||||||
results.append(data)
|
|
||||||
except Exception as e:
|
|
||||||
errors.append(e)
|
|
||||||
|
|
||||||
threads = []
|
|
||||||
for _ in range(num_threads):
|
|
||||||
t = threading.Thread(target=read_data)
|
|
||||||
threads.append(t)
|
|
||||||
t.start()
|
|
||||||
|
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
|
|
||||||
assert len(errors) == 0, f"Errors occurred during concurrent reads: {errors}"
|
|
||||||
assert len(results) == num_threads
|
|
||||||
for result in results:
|
|
||||||
assert result == test_data
|
|
||||||
|
|
||||||
def test_concurrent_read_write_same_handler(self):
|
|
||||||
"""Test that concurrent reads and writes don't corrupt data or cause errors."""
|
|
||||||
initial_data = {"counter": 0}
|
|
||||||
self.handler.save(initial_data)
|
|
||||||
|
|
||||||
num_writers = 5
|
|
||||||
num_readers = 10
|
|
||||||
writes_per_thread = 10
|
|
||||||
reads_per_thread = 20
|
|
||||||
write_errors: list[Exception] = []
|
|
||||||
read_errors: list[Exception] = []
|
|
||||||
read_results: list[dict] = []
|
|
||||||
results_lock = threading.Lock()
|
|
||||||
|
|
||||||
def writer(thread_id: int) -> None:
|
|
||||||
for i in range(writes_per_thread):
|
|
||||||
try:
|
|
||||||
data = {"writer": thread_id, "write_num": i}
|
|
||||||
self.handler.save(data)
|
|
||||||
except Exception as e:
|
|
||||||
write_errors.append(e)
|
|
||||||
|
|
||||||
def reader() -> None:
|
|
||||||
for _ in range(reads_per_thread):
|
|
||||||
try:
|
|
||||||
data = self.handler.load()
|
|
||||||
with results_lock:
|
|
||||||
read_results.append(data)
|
|
||||||
except Exception as e:
|
|
||||||
read_errors.append(e)
|
|
||||||
|
|
||||||
threads = []
|
|
||||||
for i in range(num_writers):
|
|
||||||
t = threading.Thread(target=writer, args=(i,))
|
|
||||||
threads.append(t)
|
|
||||||
|
|
||||||
for _ in range(num_readers):
|
|
||||||
t = threading.Thread(target=reader)
|
|
||||||
threads.append(t)
|
|
||||||
|
|
||||||
for t in threads:
|
|
||||||
t.start()
|
|
||||||
|
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
|
|
||||||
assert len(write_errors) == 0, f"Write errors: {write_errors}"
|
|
||||||
assert len(read_errors) == 0, f"Read errors: {read_errors}"
|
|
||||||
for result in read_results:
|
|
||||||
assert isinstance(result, dict)
|
|
||||||
|
|
||||||
def test_atomic_write_no_partial_data(self):
|
|
||||||
"""Test that atomic writes prevent partial/corrupted data from being read."""
|
|
||||||
large_data = {"key": "x" * 100000, "numbers": list(range(10000))}
|
|
||||||
num_iterations = 50
|
|
||||||
errors: list[Exception] = []
|
|
||||||
corruption_detected = False
|
|
||||||
corruption_lock = threading.Lock()
|
|
||||||
|
|
||||||
def writer() -> None:
|
|
||||||
for _ in range(num_iterations):
|
|
||||||
try:
|
|
||||||
self.handler.save(large_data)
|
|
||||||
except Exception as e:
|
|
||||||
errors.append(e)
|
|
||||||
|
|
||||||
def reader() -> None:
|
|
||||||
nonlocal corruption_detected
|
|
||||||
for _ in range(num_iterations * 2):
|
|
||||||
try:
|
|
||||||
data = self.handler.load()
|
|
||||||
if data and data != {} and data != large_data:
|
|
||||||
with corruption_lock:
|
|
||||||
corruption_detected = True
|
|
||||||
except Exception as e:
|
|
||||||
errors.append(e)
|
|
||||||
|
|
||||||
writer_thread = threading.Thread(target=writer)
|
|
||||||
reader_thread = threading.Thread(target=reader)
|
|
||||||
|
|
||||||
writer_thread.start()
|
|
||||||
reader_thread.start()
|
|
||||||
|
|
||||||
writer_thread.join()
|
|
||||||
reader_thread.join()
|
|
||||||
|
|
||||||
assert len(errors) == 0, f"Errors occurred: {errors}"
|
|
||||||
assert not corruption_detected, "Partial/corrupted data was read"
|
|
||||||
|
|
||||||
def test_thread_pool_concurrent_operations(self):
|
|
||||||
"""Test thread safety using ThreadPoolExecutor for more realistic concurrent access."""
|
|
||||||
num_operations = 100
|
|
||||||
errors: list[Exception] = []
|
|
||||||
|
|
||||||
def operation(op_id: int) -> str:
|
|
||||||
try:
|
|
||||||
if op_id % 3 == 0:
|
|
||||||
self.handler.save({"op_id": op_id, "type": "write"})
|
|
||||||
return f"write_{op_id}"
|
|
||||||
else:
|
|
||||||
data = self.handler.load()
|
|
||||||
return f"read_{op_id}_{type(data).__name__}"
|
|
||||||
except Exception as e:
|
|
||||||
errors.append(e)
|
|
||||||
return f"error_{op_id}"
|
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=20) as executor:
|
|
||||||
futures = [executor.submit(operation, i) for i in range(num_operations)]
|
|
||||||
results = [f.result() for f in as_completed(futures)]
|
|
||||||
|
|
||||||
assert len(errors) == 0, f"Errors occurred: {errors}"
|
|
||||||
assert len(results) == num_operations
|
|
||||||
|
|
||||||
def test_multiple_handlers_same_file(self):
|
|
||||||
"""Test that multiple PickleHandler instances for the same file work correctly."""
|
|
||||||
handler1 = PickleHandler(self.file_name)
|
|
||||||
handler2 = PickleHandler(self.file_name)
|
|
||||||
|
|
||||||
num_operations = 50
|
|
||||||
errors: list[Exception] = []
|
|
||||||
|
|
||||||
def use_handler1() -> None:
|
|
||||||
for i in range(num_operations):
|
|
||||||
try:
|
|
||||||
handler1.save({"handler": 1, "iteration": i})
|
|
||||||
except Exception as e:
|
|
||||||
errors.append(e)
|
|
||||||
|
|
||||||
def use_handler2() -> None:
|
|
||||||
for i in range(num_operations):
|
|
||||||
try:
|
|
||||||
handler2.save({"handler": 2, "iteration": i})
|
|
||||||
except Exception as e:
|
|
||||||
errors.append(e)
|
|
||||||
|
|
||||||
t1 = threading.Thread(target=use_handler1)
|
|
||||||
t2 = threading.Thread(target=use_handler2)
|
|
||||||
|
|
||||||
t1.start()
|
|
||||||
t2.start()
|
|
||||||
|
|
||||||
t1.join()
|
|
||||||
t2.join()
|
|
||||||
|
|
||||||
assert len(errors) == 0, f"Errors occurred: {errors}"
|
|
||||||
final_data = self.handler.load()
|
|
||||||
assert isinstance(final_data, dict)
|
|
||||||
assert "handler" in final_data
|
|
||||||
assert "iteration" in final_data
|
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
"""CrewAI development tools."""
|
"""CrewAI development tools."""
|
||||||
|
|
||||||
__version__ = "1.8.0"
|
__version__ = "1.8.1"
|
||||||
|
|||||||
@@ -117,7 +117,7 @@ show_error_codes = true
|
|||||||
warn_unused_ignores = true
|
warn_unused_ignores = true
|
||||||
python_version = "3.12"
|
python_version = "3.12"
|
||||||
exclude = "(?x)(^lib/crewai/src/crewai/cli/templates/ | ^lib/crewai/tests/ | ^lib/crewai-tools/tests/)"
|
exclude = "(?x)(^lib/crewai/src/crewai/cli/templates/ | ^lib/crewai/tests/ | ^lib/crewai-tools/tests/)"
|
||||||
plugins = ["pydantic.mypy", "crewai.mypy"]
|
plugins = ["pydantic.mypy"]
|
||||||
|
|
||||||
|
|
||||||
[tool.bandit]
|
[tool.bandit]
|
||||||
|
|||||||
Reference in New Issue
Block a user