Compare commits

..

5 Commits

Author SHA1 Message Date
lorenzejay
bb823b047a Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/experimental-environment-tools 2026-01-12 09:49:53 -08:00
lorenzejay
9d33706fd5 refactor: update allowed_paths default behavior in environment tools
Modified the default value of allowed_paths in BaseEnvironmentTool and EnvironmentTools to default to the current directory (".") instead of an empty list. This change enhances security by ensuring that operations are restricted to the current directory unless explicitly specified otherwise. Additionally, updated related tests to reflect this new default behavior and improved type hints for better clarity.
2026-01-08 16:33:13 -08:00
lorenzejay
4364503615 linted 2026-01-08 16:20:30 -08:00
lorenzejay
d0e4c356e1 refactor: improve code readability in environment tools
Refactored the file_search_tool.py, grep_tool.py, and list_dir_tool.py to enhance code readability. Changes include formatting list comprehensions for better clarity, adjusting error handling for consistency, and removing unnecessary imports. These improvements aim to streamline the codebase and maintain a clean structure for future development.
2026-01-08 16:13:07 -08:00
lorenzejay
6c2dfdff56 feat: add environment tools for file system operations
Introduced a new set of environment tools to enhance file system interactions within the CrewAI framework. This includes tools for reading files, searching for files by name patterns, and listing directory contents, all with built-in path security to prevent unauthorized access. The new tools are designed to facilitate context engineering for agents, improving their ability to interact with the file system effectively. Additionally, updated the experimental module's  to include these new tools in the public API.
2026-01-08 16:10:57 -08:00
43 changed files with 1544 additions and 2567 deletions

View File

@@ -291,7 +291,6 @@
"en/observability/arize-phoenix",
"en/observability/braintrust",
"en/observability/datadog",
"en/observability/galileo",
"en/observability/langdb",
"en/observability/langfuse",
"en/observability/langtrace",
@@ -743,7 +742,6 @@
"pt-BR/observability/arize-phoenix",
"pt-BR/observability/braintrust",
"pt-BR/observability/datadog",
"pt-BR/observability/galileo",
"pt-BR/observability/langdb",
"pt-BR/observability/langfuse",
"pt-BR/observability/langtrace",
@@ -1205,7 +1203,6 @@
"ko/observability/arize-phoenix",
"ko/observability/braintrust",
"ko/observability/datadog",
"ko/observability/galileo",
"ko/observability/langdb",
"ko/observability/langfuse",
"ko/observability/langtrace",

View File

@@ -1,48 +1,43 @@
---
title: Agent-to-Agent (A2A) Protocol
description: Agents delegate tasks to remote A2A agents and/or operate as A2A-compliant server agents.
description: Enable CrewAI agents to delegate tasks to remote A2A-compliant agents for specialized handling
icon: network-wired
mode: "wide"
---
## A2A Agent Delegation
CrewAI treats [A2A protocol](https://a2a-protocol.org/latest/) as a first-class delegation primitive, enabling agents to delegate tasks, request information, and collaborate with remote agents, as well as act as A2A-compliant server agents.
In client mode, agents autonomously choose between local execution and remote delegation based on task requirements.
CrewAI supports the Agent-to-Agent (A2A) protocol, allowing agents to delegate tasks to remote specialized agents. The agent's LLM automatically decides whether to handle a task directly or delegate to an A2A agent based on the task requirements.
<Note>
A2A delegation requires the `a2a-sdk` package. Install with: `uv add 'crewai[a2a]'` or `pip install 'crewai[a2a]'`
</Note>
## How It Works
When an agent is configured with A2A capabilities:
1. The Agent analyzes each task
1. The LLM analyzes each task
2. It decides to either:
- Handle the task directly using its own capabilities
- Delegate to a remote A2A agent for specialized handling
3. If delegating, the agent communicates with the remote A2A agent through the protocol
4. Results are returned to the CrewAI workflow
<Note>
A2A delegation requires the `a2a-sdk` package. Install with: `uv add 'crewai[a2a]'` or `pip install 'crewai[a2a]'`
</Note>
## Basic Configuration
<Warning>
`crewai.a2a.config.A2AConfig` is deprecated and will be removed in v2.0.0. Use `A2AClientConfig` for connecting to remote agents and/or `A2AServerConfig` for exposing agents as servers.
</Warning>
Configure an agent for A2A delegation by setting the `a2a` parameter:
```python Code
from crewai import Agent, Crew, Task
from crewai.a2a import A2AClientConfig
from crewai.a2a import A2AConfig
agent = Agent(
role="Research Coordinator",
goal="Coordinate research tasks efficiently",
backstory="Expert at delegating to specialized research agents",
llm="gpt-4o",
a2a=A2AClientConfig(
a2a=A2AConfig(
endpoint="https://example.com/.well-known/agent-card.json",
timeout=120,
max_turns=10
@@ -59,9 +54,9 @@ crew = Crew(agents=[agent], tasks=[task], verbose=True)
result = crew.kickoff()
```
## Client Configuration Options
## Configuration Options
The `A2AClientConfig` class accepts the following parameters:
The `A2AConfig` class accepts the following parameters:
<ParamField path="endpoint" type="str" required>
The A2A agent endpoint URL (typically points to `.well-known/agent-card.json`)
@@ -100,30 +95,14 @@ The `A2AClientConfig` class accepts the following parameters:
Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`.
</ParamField>
<ParamField path="accepted_output_modes" type="list[str]" default='["application/json"]'>
Media types the client can accept in responses.
</ParamField>
<ParamField path="supported_transports" type="list[str]" default='["JSONRPC"]'>
Ordered list of transport protocols the client supports.
</ParamField>
<ParamField path="use_client_preference" type="bool" default="False">
Whether to prioritize client transport preferences over server.
</ParamField>
<ParamField path="extensions" type="list[str]" default="[]">
Extension URIs the client supports.
</ParamField>
## Authentication
For A2A agents that require authentication, use one of the provided auth schemes:
<Tabs>
<Tab title="Bearer Token">
```python bearer_token_auth.py lines
from crewai.a2a import A2AClientConfig
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a.auth import BearerTokenAuth
agent = Agent(
@@ -131,18 +110,18 @@ agent = Agent(
goal="Coordinate tasks with secured agents",
backstory="Manages secure agent communications",
llm="gpt-4o",
a2a=A2AClientConfig(
a2a=A2AConfig(
endpoint="https://secure-agent.example.com/.well-known/agent-card.json",
auth=BearerTokenAuth(token="your-bearer-token"),
timeout=120
)
)
```
```
</Tab>
<Tab title="API Key">
```python api_key_auth.py lines
from crewai.a2a import A2AClientConfig
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a.auth import APIKeyAuth
agent = Agent(
@@ -150,7 +129,7 @@ agent = Agent(
goal="Coordinate with API-based agents",
backstory="Manages API-authenticated communications",
llm="gpt-4o",
a2a=A2AClientConfig(
a2a=A2AConfig(
endpoint="https://api-agent.example.com/.well-known/agent-card.json",
auth=APIKeyAuth(
api_key="your-api-key",
@@ -160,12 +139,12 @@ agent = Agent(
timeout=120
)
)
```
```
</Tab>
<Tab title="OAuth2">
```python oauth2_auth.py lines
from crewai.a2a import A2AClientConfig
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a.auth import OAuth2ClientCredentials
agent = Agent(
@@ -173,7 +152,7 @@ agent = Agent(
goal="Coordinate with OAuth-secured agents",
backstory="Manages OAuth-authenticated communications",
llm="gpt-4o",
a2a=A2AClientConfig(
a2a=A2AConfig(
endpoint="https://oauth-agent.example.com/.well-known/agent-card.json",
auth=OAuth2ClientCredentials(
token_url="https://auth.example.com/oauth/token",
@@ -184,12 +163,12 @@ agent = Agent(
timeout=120
)
)
```
```
</Tab>
<Tab title="HTTP Basic">
```python http_basic_auth.py lines
from crewai.a2a import A2AClientConfig
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a.auth import HTTPBasicAuth
agent = Agent(
@@ -197,7 +176,7 @@ agent = Agent(
goal="Coordinate with basic auth agents",
backstory="Manages basic authentication communications",
llm="gpt-4o",
a2a=A2AClientConfig(
a2a=A2AConfig(
endpoint="https://basic-agent.example.com/.well-known/agent-card.json",
auth=HTTPBasicAuth(
username="your-username",
@@ -206,7 +185,7 @@ agent = Agent(
timeout=120
)
)
```
```
</Tab>
</Tabs>
@@ -215,7 +194,7 @@ agent = Agent(
Configure multiple A2A agents for delegation by passing a list:
```python Code
from crewai.a2a import A2AClientConfig
from crewai.a2a import A2AConfig
from crewai.a2a.auth import BearerTokenAuth
agent = Agent(
@@ -224,11 +203,11 @@ agent = Agent(
backstory="Expert at delegating to the right specialist",
llm="gpt-4o",
a2a=[
A2AClientConfig(
A2AConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
timeout=120
),
A2AClientConfig(
A2AConfig(
endpoint="https://data.example.com/.well-known/agent-card.json",
auth=BearerTokenAuth(token="data-token"),
timeout=90
@@ -244,7 +223,7 @@ The LLM will automatically choose which A2A agent to delegate to based on the ta
Control how agent connection failures are handled using the `fail_fast` parameter:
```python Code
from crewai.a2a import A2AClientConfig
from crewai.a2a import A2AConfig
# Fail immediately on connection errors (default)
agent = Agent(
@@ -252,7 +231,7 @@ agent = Agent(
goal="Coordinate research tasks",
backstory="Expert at delegation",
llm="gpt-4o",
a2a=A2AClientConfig(
a2a=A2AConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
fail_fast=True
)
@@ -265,11 +244,11 @@ agent = Agent(
backstory="Expert at working with available resources",
llm="gpt-4o",
a2a=[
A2AClientConfig(
A2AConfig(
endpoint="https://primary.example.com/.well-known/agent-card.json",
fail_fast=False
),
A2AClientConfig(
A2AConfig(
endpoint="https://backup.example.com/.well-known/agent-card.json",
fail_fast=False
)
@@ -288,8 +267,8 @@ Control how your agent receives task status updates from remote A2A agents:
<Tabs>
<Tab title="Streaming (Default)">
```python streaming_config.py lines
from crewai.a2a import A2AClientConfig
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a.updates import StreamingConfig
agent = Agent(
@@ -297,17 +276,17 @@ agent = Agent(
goal="Coordinate research tasks",
backstory="Expert at delegation",
llm="gpt-4o",
a2a=A2AClientConfig(
a2a=A2AConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
updates=StreamingConfig()
)
)
```
```
</Tab>
<Tab title="Polling">
```python polling_config.py lines
from crewai.a2a import A2AClientConfig
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a.updates import PollingConfig
agent = Agent(
@@ -315,7 +294,7 @@ agent = Agent(
goal="Coordinate research tasks",
backstory="Expert at delegation",
llm="gpt-4o",
a2a=A2AClientConfig(
a2a=A2AConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
updates=PollingConfig(
interval=2.0,
@@ -324,12 +303,12 @@ agent = Agent(
)
)
)
```
```
</Tab>
<Tab title="Push Notifications">
```python push_notifications_config.py lines
from crewai.a2a import A2AClientConfig
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a.updates import PushNotificationConfig
agent = Agent(
@@ -337,137 +316,19 @@ agent = Agent(
goal="Coordinate research tasks",
backstory="Expert at delegation",
llm="gpt-4o",
a2a=A2AClientConfig(
a2a=A2AConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
updates=PushNotificationConfig(
url="{base_url}/a2a/callback",
url={base_url}/a2a/callback",
token="your-validation-token",
timeout=300.0
)
)
)
```
```
</Tab>
</Tabs>
## Exposing Agents as A2A Servers
You can expose your CrewAI agents as A2A-compliant servers, allowing other A2A clients to delegate tasks to them.
### Server Configuration
Add an `A2AServerConfig` to your agent to enable server capabilities:
```python a2a_server_agent.py lines
from crewai import Agent
from crewai.a2a import A2AServerConfig
agent = Agent(
role="Data Analyst",
goal="Analyze datasets and provide insights",
backstory="Expert data scientist with statistical analysis skills",
llm="gpt-4o",
a2a=A2AServerConfig(url="https://your-server.com")
)
```
### Server Configuration Options
<ParamField path="name" type="str" default="None">
Human-readable name for the agent. Defaults to the agent's role if not provided.
</ParamField>
<ParamField path="description" type="str" default="None">
Human-readable description. Defaults to the agent's goal and backstory if not provided.
</ParamField>
<ParamField path="version" type="str" default="1.0.0">
Version string for the agent card.
</ParamField>
<ParamField path="skills" type="list[AgentSkill]" default="[]">
List of agent skills. Auto-generated from agent tools if not provided.
</ParamField>
<ParamField path="capabilities" type="AgentCapabilities" default="AgentCapabilities(streaming=True, push_notifications=False)">
Declaration of optional capabilities supported by the agent.
</ParamField>
<ParamField path="default_input_modes" type="list[str]" default='["text/plain", "application/json"]'>
Supported input MIME types.
</ParamField>
<ParamField path="default_output_modes" type="list[str]" default='["text/plain", "application/json"]'>
Supported output MIME types.
</ParamField>
<ParamField path="url" type="str" default="None">
Preferred endpoint URL. If set, overrides the URL passed to `to_agent_card()`.
</ParamField>
<ParamField path="preferred_transport" type="Literal['JSONRPC', 'GRPC', 'HTTP+JSON']" default="JSONRPC">
Transport protocol for the preferred endpoint.
</ParamField>
<ParamField path="protocol_version" type="str" default="0.3">
A2A protocol version this agent supports.
</ParamField>
<ParamField path="provider" type="AgentProvider" default="None">
Information about the agent's service provider.
</ParamField>
<ParamField path="documentation_url" type="str" default="None">
URL to the agent's documentation.
</ParamField>
<ParamField path="icon_url" type="str" default="None">
URL to an icon for the agent.
</ParamField>
<ParamField path="additional_interfaces" type="list[AgentInterface]" default="[]">
Additional supported interfaces (transport and URL combinations).
</ParamField>
<ParamField path="security" type="list[dict[str, list[str]]]" default="[]">
Security requirement objects for all agent interactions.
</ParamField>
<ParamField path="security_schemes" type="dict[str, SecurityScheme]" default="{}">
Security schemes available to authorize requests.
</ParamField>
<ParamField path="supports_authenticated_extended_card" type="bool" default="False">
Whether agent provides extended card to authenticated users.
</ParamField>
<ParamField path="signatures" type="list[AgentCardSignature]" default="[]">
JSON Web Signatures for the AgentCard.
</ParamField>
### Combined Client and Server
An agent can act as both client and server by providing both configurations:
```python Code
from crewai import Agent
from crewai.a2a import A2AClientConfig, A2AServerConfig
agent = Agent(
role="Research Coordinator",
goal="Coordinate research and serve analysis requests",
backstory="Expert at delegation and analysis",
llm="gpt-4o",
a2a=[
A2AClientConfig(
endpoint="https://specialist.example.com/.well-known/agent-card.json",
timeout=120
),
A2AServerConfig(url="https://your-server.com")
]
)
```
## Best Practices
<CardGroup cols={2}>

View File

@@ -1,115 +0,0 @@
---
title: Galileo
description: Galileo integration for CrewAI tracing and evaluation
icon: telescope
mode: "wide"
---
## Overview
This guide demonstrates how to integrate **Galileo** with **CrewAI**
for comprehensive tracing and Evaluation Engineering.
By the end of this guide, you will be able to trace your CrewAI agents,
monitor their performance, and evaluate their behaviour with
Galileo's powerful observability platform.
> **What is Galileo?** [Galileo](https://galileo.ai) is AI evaluation and observability
platform that delivers end-to-end tracing, evaluation,
and monitoring for AI applications. It enables teams to capture ground truth,
create robust guardrails, and run systematic experiments with
built-in experiment tracking and performance analytics—ensuring reliability,
transparency, and continuous improvement across the AI lifecycle.
## Getting started
This tutorial follows the [CrewAI quickstart](/en/quickstart) and shows how to add
Galileo's [CrewAIEventListener](https://v2docs.galileo.ai/sdk-api/python/reference/handlers/crewai/handler),
an event handler.
For more information, see Galileos
[Add Galileo to a CrewAI Application](https://v2docs.galileo.ai/how-to-guides/third-party-integrations/add-galileo-to-crewai/add-galileo-to-crewai)
how-to guide.
> **Note** This tutorial assumes you have completed the [CrewAI quickstart](/en/quickstart).
If you want a completed comprehensive example, see the Galileo
[CrewAI sdk-example repo](https://github.com/rungalileo/sdk-examples/tree/main/python/agent/crew-ai).
### Step 1: Install dependencies
Install the required dependencies for your app.
Create a virtual environment using your preferred method,
then install dependencies inside that environment using your
preferred tool:
```bash
uv add galileo
```
### Step 2: Add to the .env file from the [CrewAI quickstart](/en/quickstart)
```bash
# Your Galileo API key
GALILEO_API_KEY="your-galileo-api-key"
# Your Galileo project name
GALILEO_PROJECT="your-galileo-project-name"
# The name of the Log stream you want to use for logging
GALILEO_LOG_STREAM="your-galileo-log-stream "
```
### Step 3: Add the Galileo event listener
To enable logging with Galileo, you need to create an instance of the `CrewAIEventListener`.
Import the Galileo CrewAI handler package by
adding the following code at the top of your main.py file:
```python
from galileo.handlers.crewai.handler import CrewAIEventListener
```
At the start of your run function, create the event listener:
```python
def run():
# Create the event listener
CrewAIEventListener()
# The rest of your existing code goes here
```
When you create the listener instance, it is automatically
registered with CrewAI.
### Step 4: Run your crew
Run your crew with the CrewAI CLI:
```bash
crewai run
```
### Step 5: View the traces in Galileo
Once your crew has finished, the traces will be flushed and appear in Galileo.
![Galileo trace view](/images/galileo-trace-veiw.png)
## Understanding the Galileo Integration
Galileo integrates with CrewAI by registering an event listener
that captures Crew execution events (e.g., agent actions, tool calls, model responses)
and forwards them to Galileo for observability and evaluation.
### Understanding the event listener
Creating a `CrewAIEventListener()` instance is all thats
required to enable Galileo for a CrewAI run. When instantiated, the listener:
- Automatically registers itself with CrewAI
- Reads Galileo configuration from environment variables
- Logs all run data to the Galileo project and log stream specified by
`GALILEO_PROJECT` and `GALILEO_LOG_STREAM`
No additional configuration or code changes are required.
All data from this run is logged to the Galileo project and
log stream specified by your environment configuration
(for example, GALILEO_PROJECT and GALILEO_LOG_STREAM).

Binary file not shown.

Before

Width:  |  Height:  |  Size: 239 KiB

View File

@@ -1,115 +0,0 @@
---
title: Galileo 갈릴레오
description: CrewAI 추적 및 평가를 위한 Galileo 통합
icon: telescope
mode: "wide"
---
## 개요
이 가이드는 **Galileo**를 **CrewAI**와 통합하는 방법을 보여줍니다.
포괄적인 추적 및 평가 엔지니어링을 위한 것입니다.
이 가이드가 끝나면 CrewAI 에이전트를 추적할 수 있게 됩니다.
성과를 모니터링하고 행동을 평가합니다.
Galileo의 강력한 관측 플랫폼.
> **갈릴레오(Galileo)란 무엇인가요?**[Galileo](https://galileo.ai/)는 AI 평가 및 관찰 가능성입니다.
엔드투엔드 추적, 평가,
AI 애플리케이션 모니터링. 이를 통해 팀은 실제 사실을 포착할 수 있습니다.
견고한 가드레일을 만들고 체계적인 실험을 실행하세요.
내장된 실험 추적 및 성능 분석으로 신뢰성 보장
AI 수명주기 전반에 걸쳐 투명성과 지속적인 개선을 제공합니다.
## 시작하기
이 튜토리얼은 [CrewAI 빠른 시작](/ko/quickstart.mdx)을 따르며 추가하는 방법을 보여줍니다.
갈릴레오의 [CrewAIEventListener](https://v2docs.galileo.ai/sdk-api/python/reference/handlers/crewai/handler),
이벤트 핸들러.
자세한 내용은 갈릴레오 문서를 참고하세요.
[CrewAI 애플리케이션에 Galileo 추가](https://v2docs.galileo.ai/how-to-guides/third-party-integrations/add-galileo-to-crewai/add-galileo-to-crewai)
방법 안내.
> **참고**이 튜토리얼에서는 [CrewAI 빠른 시작](/ko/quickstart.mdx)을 완료했다고 가정합니다.
완전한 포괄적인 예제를 원한다면 Galileo
[CrewAI SDK 예제 저장소](https://github.com/rungalileo/sdk-examples/tree/main/python/agent/crew-ai).
### 1단계: 종속성 설치
앱에 필요한 종속성을 설치합니다.
원하는 방법으로 가상 환경을 생성하고,
그런 다음 다음을 사용하여 해당 환경 내에 종속성을 설치하십시오.
선호하는 도구:
```bash
uv add galileo
```
### 2단계: [CrewAI 빠른 시작](/ko/quickstart.mdx)에서 .env 파일에 추가
```bash
# Your Galileo API key
GALILEO_API_KEY="your-galileo-api-key"
# Your Galileo project name
GALILEO_PROJECT="your-galileo-project-name"
# The name of the Log stream you want to use for logging
GALILEO_LOG_STREAM="your-galileo-log-stream "
```
### 3단계: Galileo 이벤트 리스너 추가
Galileo로 로깅을 활성화하려면 `CrewAIEventListener`의 인스턴스를 생성해야 합니다.
다음을 통해 Galileo CrewAI 핸들러 패키지를 가져옵니다.
main.py 파일 상단에 다음 코드를 추가하세요.
```python
from galileo.handlers.crewai.handler import CrewAIEventListener
```
실행 함수 시작 시 이벤트 리스너를 생성합니다.
```python
def run():
# Create the event listener
CrewAIEventListener()
# The rest of your existing code goes here
```
리스너 인스턴스를 생성하면 자동으로
CrewAI에 등록되었습니다.
### 4단계: Crew Agent 실행
CrewAI CLI를 사용하여 Crew Agent를 실행하세요.
```bash
crewai run
```
### 5단계: Galileo에서 추적 보기
승무원 에이전트가 완료되면 흔적이 플러시되어 Galileo에 나타납니다.
![Galileo trace view](/images/galileo-trace-veiw.png)
## 갈릴레오 통합 이해
Galileo는 이벤트 리스너를 등록하여 CrewAI와 통합됩니다.
승무원 실행 이벤트(예: 에이전트 작업, 도구 호출, 모델 응답)를 캡처합니다.
관찰 가능성과 평가를 위해 이를 갈릴레오에 전달합니다.
### 이벤트 리스너 이해
`CrewAIEventListener()` 인스턴스를 생성하는 것이 전부입니다.
CrewAI 실행을 위해 Galileo를 활성화하는 데 필요합니다. 인스턴스화되면 리스너는 다음을 수행합니다.
-CrewAI에 자동으로 등록됩니다.
-환경 변수에서 Galileo 구성을 읽습니다.
-모든 실행 데이터를 Galileo 프로젝트 및 다음에서 지정한 로그 스트림에 기록합니다.
`GALILEO_PROJECT` 및 `GALILEO_LOG_STREAM`
추가 구성이나 코드 변경이 필요하지 않습니다.
이 실행의 모든 데이터는 Galileo 프로젝트에 기록되며
환경 구성에 따라 지정된 로그 스트림
(예: GALILEO_PROJECT 및 GALILEO_LOG_STREAM)

View File

@@ -1,115 +0,0 @@
---
title: Galileo Galileu
description: Integração Galileo para rastreamento e avaliação CrewAI
icon: telescope
mode: "wide"
---
## Visão geral
Este guia demonstra como integrar o **Galileo**com o **CrewAI**
para rastreamento abrangente e engenharia de avaliação.
Ao final deste guia, você será capaz de rastrear seus agentes CrewAI,
monitorar seu desempenho e avaliar seu comportamento com
A poderosa plataforma de observabilidade do Galileo.
> **O que é Galileo?**[Galileo](https://galileo.ai/) é avaliação e observabilidade de IA
plataforma que oferece rastreamento, avaliação e
e monitoramento de aplicações de IA. Ele permite que as equipes capturem a verdade,
criar grades de proteção robustas e realizar experimentos sistemáticos com
rastreamento de experimentos integrado e análise de desempenho -garantindo confiabilidade,
transparência e melhoria contínua em todo o ciclo de vida da IA.
## Primeiros passos
Este tutorial segue o [CrewAI Quickstart](pt-BR/quickstart) e mostra como adicionar
[CrewAIEventListener] do Galileo(https://v2docs.galileo.ai/sdk-api/python/reference/handlers/crewai/handler),
um manipulador de eventos.
Para mais informações, consulte Galileu
[Adicionar Galileo a um aplicativo CrewAI](https://v2docs.galileo.ai/how-to-guides/third-party-integrations/add-galileo-to-crewai/add-galileo-to-crewai)
guia prático.
> **Observação**Este tutorial pressupõe que você concluiu o [CrewAI Quickstart](pt-BR/quickstart).
Se você quiser um exemplo completo e abrangente, consulte o Galileo
[Repositório de exemplo SDK da CrewAI](https://github.com/rungalileo/sdk-examples/tree/main/python/agent/crew-ai).
### Etapa 1: instalar dependências
Instale as dependências necessárias para seu aplicativo.
Crie um ambiente virtual usando seu método preferido,
em seguida, instale dependências dentro desse ambiente usando seu
ferramenta preferida:
```bash
uv add galileo
```
### Etapa 2: adicione ao arquivo .env do [CrewAI Quickstart](/pt-BR/quickstart)
```bash
# Your Galileo API key
GALILEO_API_KEY="your-galileo-api-key"
# Your Galileo project name
GALILEO_PROJECT="your-galileo-project-name"
# The name of the Log stream you want to use for logging
GALILEO_LOG_STREAM="your-galileo-log-stream "
```
### Etapa 3: adicionar o ouvinte de eventos Galileo
Para habilitar o registro com Galileo, você precisa criar uma instância do `CrewAIEventListener`.
Importe o pacote manipulador Galileo CrewAI por
adicionando o seguinte código no topo do seu arquivo main.py:
```python
from galileo.handlers.crewai.handler import CrewAIEventListener
```
No início da sua função run, crie o ouvinte de evento:
```python
def run():
# Create the event listener
CrewAIEventListener()
# The rest of your existing code goes here
```
Quando você cria a instância do listener, ela é automaticamente
registrado na CrewAI.
### Etapa 4: administre sua Crew
Administre sua Crew com o CrewAI CLI:
```bash
crewai run
```
### Passo 5: Visualize os traços no Galileo
Assim que sua tripulação terminar, os rastros serão eliminados e aparecerão no Galileo.
![Galileo trace view](/images/galileo-trace-veiw.png)
## Compreendendo a integração do Galileo
Galileo se integra ao CrewAI registrando um ouvinte de evento
que captura eventos de execução da tripulação (por exemplo, ações do agente, chamadas de ferramentas, respostas do modelo)
e os encaminha ao Galileo para observabilidade e avaliação.
### Compreendendo o ouvinte de eventos
Criar uma instância `CrewAIEventListener()` é tudo o que você precisa
necessário para habilitar o Galileo para uma execução do CrewAI. Quando instanciado, o ouvinte:
-Registra-se automaticamente no CrewAI
-Lê a configuração do Galileo a partir de variáveis de ambiente
-Registra todos os dados de execução no projeto Galileo e fluxo de log especificado por
`GALILEO_PROJECT` e `GALILEO_LOG_STREAM`
Nenhuma configuração adicional ou alterações de código são necessárias.
Todos os dados desta execução são registados no projecto Galileo e
fluxo de log especificado pela configuração do seu ambiente
(por exemplo, GALILEO_PROJECT e GALILEO_LOG_STREAM).

View File

@@ -12,7 +12,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.8.1",
"crewai==1.8.0",
"lancedb~=0.5.4",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",

View File

@@ -291,4 +291,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.8.1"
__version__ = "1.8.0"

View File

@@ -49,7 +49,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.8.1",
"crewai-tools==1.8.0",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.8.1"
__version__ = "1.8.0"
_telemetry_submitted = False

View File

@@ -1,10 +1,8 @@
"""Agent-to-Agent (A2A) protocol communication module for CrewAI."""
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
from crewai.a2a.config import A2AConfig
__all__ = [
"A2AClientConfig",
"A2AConfig",
"A2AServerConfig",
]

View File

@@ -5,57 +5,45 @@ This module is separate from experimental.a2a to avoid circular imports.
from __future__ import annotations
from importlib.metadata import version
from typing import Any, ClassVar, Literal
from typing import Annotated, Any, ClassVar, Literal
from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import deprecated
from pydantic import (
BaseModel,
BeforeValidator,
ConfigDict,
Field,
HttpUrl,
TypeAdapter,
)
from crewai.a2a.auth.schemas import AuthScheme
from crewai.a2a.types import TransportType, Url
try:
from a2a.types import (
AgentCapabilities,
AgentCardSignature,
AgentInterface,
AgentProvider,
AgentSkill,
SecurityScheme,
)
from crewai.a2a.updates import UpdateConfig
except ImportError:
UpdateConfig = Any
AgentCapabilities = Any
AgentCardSignature = Any
AgentInterface = Any
AgentProvider = Any
SecurityScheme = Any
AgentSkill = Any
UpdateConfig = Any # type: ignore[misc,assignment]
http_url_adapter = TypeAdapter(HttpUrl)
Url = Annotated[
str,
BeforeValidator(
lambda value: str(http_url_adapter.validate_python(value, strict=True))
),
]
def _get_default_update_config() -> UpdateConfig:
from crewai.a2a.updates import StreamingConfig
return StreamingConfig()
@deprecated(
"""
`crewai.a2a.config.A2AConfig` is deprecated and will be removed in v2.0.0,
use `crewai.a2a.config.A2AClientConfig` or `crewai.a2a.config.A2AServerConfig` instead.
""",
category=FutureWarning,
)
class A2AConfig(BaseModel):
"""Configuration for A2A protocol integration.
Deprecated:
Use A2AClientConfig instead. This class will be removed in a future version.
Attributes:
endpoint: A2A agent endpoint URL.
auth: Authentication scheme.
@@ -99,176 +87,3 @@ class A2AConfig(BaseModel):
default="JSONRPC",
description="Specified mode of A2A transport protocol",
)
class A2AClientConfig(BaseModel):
"""Configuration for connecting to remote A2A agents.
Attributes:
endpoint: A2A agent endpoint URL.
auth: Authentication scheme.
timeout: Request timeout in seconds.
max_turns: Maximum conversation turns with A2A agent.
response_model: Optional Pydantic model for structured A2A agent responses.
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
updates: Update mechanism config.
accepted_output_modes: Media types the client can accept in responses.
supported_transports: Ordered list of transport protocols the client supports.
use_client_preference: Whether to prioritize client transport preferences over server.
extensions: Extension URIs the client supports.
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
endpoint: Url = Field(description="A2A agent endpoint URL")
auth: AuthScheme | None = Field(
default=None,
description="Authentication scheme",
)
timeout: int = Field(default=120, description="Request timeout in seconds")
max_turns: int = Field(
default=10, description="Maximum conversation turns with A2A agent"
)
response_model: type[BaseModel] | None = Field(
default=None,
description="Optional Pydantic model for structured A2A agent responses",
)
fail_fast: bool = Field(
default=True,
description="If True, raise error when agent unreachable; if False, skip",
)
trust_remote_completion_status: bool = Field(
default=False,
description="If True, return A2A result directly when completed",
)
updates: UpdateConfig = Field(
default_factory=_get_default_update_config,
description="Update mechanism config",
)
accepted_output_modes: list[str] = Field(
default_factory=lambda: ["application/json"],
description="Media types the client can accept in responses",
)
supported_transports: list[str] = Field(
default_factory=lambda: ["JSONRPC"],
description="Ordered list of transport protocols the client supports",
)
use_client_preference: bool = Field(
default=False,
description="Whether to prioritize client transport preferences over server",
)
extensions: list[str] = Field(
default_factory=list,
description="Extension URIs the client supports",
)
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
default="JSONRPC",
description="Specified mode of A2A transport protocol",
)
class A2AServerConfig(BaseModel):
"""Configuration for exposing a Crew or Agent as an A2A server.
All fields correspond to A2A AgentCard fields. Fields like name, description,
and skills can be auto-derived from the Crew/Agent if not provided.
Attributes:
name: Human-readable name for the agent.
description: Human-readable description of the agent.
version: Version string for the agent card.
skills: List of agent skills/capabilities.
default_input_modes: Default supported input MIME types.
default_output_modes: Default supported output MIME types.
capabilities: Declaration of optional capabilities.
preferred_transport: Transport protocol for the preferred endpoint.
protocol_version: A2A protocol version this agent supports.
provider: Information about the agent's service provider.
documentation_url: URL to the agent's documentation.
icon_url: URL to an icon for the agent.
additional_interfaces: Additional supported interfaces.
security: Security requirement objects for all interactions.
security_schemes: Security schemes available to authorize requests.
supports_authenticated_extended_card: Whether agent provides extended card to authenticated users.
url: Preferred endpoint URL for the agent.
signatures: JSON Web Signatures for the AgentCard.
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
name: str | None = Field(
default=None,
description="Human-readable name for the agent. Auto-derived from Crew/Agent if not provided.",
)
description: str | None = Field(
default=None,
description="Human-readable description of the agent. Auto-derived from Crew/Agent if not provided.",
)
version: str = Field(
default="1.0.0",
description="Version string for the agent card",
)
skills: list[AgentSkill] = Field(
default_factory=list,
description="List of agent skills. Auto-derived from tasks/tools if not provided.",
)
default_input_modes: list[str] = Field(
default_factory=lambda: ["text/plain", "application/json"],
description="Default supported input MIME types",
)
default_output_modes: list[str] = Field(
default_factory=lambda: ["text/plain", "application/json"],
description="Default supported output MIME types",
)
capabilities: AgentCapabilities = Field(
default_factory=lambda: AgentCapabilities(
streaming=True,
push_notifications=False,
),
description="Declaration of optional capabilities supported by the agent",
)
preferred_transport: TransportType = Field(
default="JSONRPC",
description="Transport protocol for the preferred endpoint",
)
protocol_version: str = Field(
default_factory=lambda: version("a2a-sdk"),
description="A2A protocol version this agent supports",
)
provider: AgentProvider | None = Field(
default=None,
description="Information about the agent's service provider",
)
documentation_url: Url | None = Field(
default=None,
description="URL to the agent's documentation",
)
icon_url: Url | None = Field(
default=None,
description="URL to an icon for the agent",
)
additional_interfaces: list[AgentInterface] = Field(
default_factory=list,
description="Additional supported interfaces (transport and URL combinations)",
)
security: list[dict[str, list[str]]] = Field(
default_factory=list,
description="Security requirement objects for all agent interactions",
)
security_schemes: dict[str, SecurityScheme] = Field(
default_factory=dict,
description="Security schemes available to authorize requests",
)
supports_authenticated_extended_card: bool = Field(
default=False,
description="Whether agent provides extended card to authenticated users",
)
url: Url | None = Field(
default=None,
description="Preferred endpoint URL for the agent. Set at runtime if not provided.",
)
signatures: list[AgentCardSignature] = Field(
default_factory=list,
description="JSON Web Signatures for the AgentCard",
)

View File

@@ -1,17 +1,7 @@
"""Type definitions for A2A protocol message parts."""
from __future__ import annotations
from typing import Any, Literal, Protocol, TypedDict, runtime_checkable
from typing import (
Annotated,
Any,
Literal,
Protocol,
TypedDict,
runtime_checkable,
)
from pydantic import BeforeValidator, HttpUrl, TypeAdapter
from typing_extensions import NotRequired
from crewai.a2a.updates import (
@@ -25,18 +15,6 @@ from crewai.a2a.updates import (
)
TransportType = Literal["JSONRPC", "GRPC", "HTTP+JSON"]
http_url_adapter: TypeAdapter[HttpUrl] = TypeAdapter(HttpUrl)
Url = Annotated[
str,
BeforeValidator(
lambda value: str(http_url_adapter.validate_python(value, strict=True))
),
]
@runtime_checkable
class AgentResponseProtocol(Protocol):
"""Protocol for the dynamically created AgentResponse model."""

View File

@@ -1,14 +1,16 @@
"""A2A delegation utilities for executing tasks on remote agents."""
"""Utility functions for A2A (Agent-to-Agent) protocol delegation."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator, MutableMapping
from contextlib import asynccontextmanager
from functools import lru_cache
import time
from typing import TYPE_CHECKING, Any, Literal
import uuid
from a2a.client import Client, ClientConfig, ClientFactory
from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
@@ -17,15 +19,19 @@ from a2a.types import (
Role,
TextPart,
)
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
import httpx
from pydantic import BaseModel
from pydantic import BaseModel, Field, create_model
from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth
from crewai.a2a.auth.utils import (
_auth_store,
configure_auth_client,
retry_on_401,
validate_auth_against_agent_card,
)
from crewai.a2a.config import A2AConfig
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.types import (
HANDLER_REGISTRY,
@@ -39,7 +45,6 @@ from crewai.a2a.updates import (
StreamingHandler,
UpdateConfig,
)
from crewai.a2a.utils.agent_card import _afetch_agent_card_cached
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AConversationStartedEvent,
@@ -47,6 +52,7 @@ from crewai.events.types.a2a_events import (
A2ADelegationStartedEvent,
A2AMessageSentEvent,
)
from crewai.types.utils import create_literals_from_strings
if TYPE_CHECKING:
@@ -69,6 +75,187 @@ def get_handler(config: UpdateConfig | None) -> HandlerType:
return HANDLER_REGISTRY.get(type(config), StreamingHandler)
@lru_cache()
def _fetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
_ttl_hash: int,
) -> AgentCard:
"""Cached sync version of fetch_agent_card."""
auth = _auth_store.get(auth_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
_afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
def fetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
cache_ttl: int = 300,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint with optional caching.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
auth: Optional AuthScheme for authentication
timeout: Request timeout in seconds
use_cache: Whether to use caching (default True)
cache_ttl: Cache TTL in seconds (default 300 = 5 minutes)
Returns:
AgentCard object with agent capabilities and skills
Raises:
httpx.HTTPStatusError: If the request fails
A2AClientHTTPError: If authentication fails
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
ttl_hash = int(time.time() // cache_ttl)
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
async def afetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint asynchronously.
Native async implementation. Use this when running in an async context.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL).
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
use_cache: Whether to use caching (default True).
Returns:
AgentCard object with agent capabilities and skills.
Raises:
httpx.HTTPStatusError: If the request fails.
A2AClientHTTPError: If authentication fails.
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
agent_card: AgentCard = await _afetch_agent_card_cached(
endpoint, auth_hash, timeout
)
return agent_card
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator]
async def _afetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
) -> AgentCard:
"""Cached async implementation of AgentCard fetching."""
auth = _auth_store.get(auth_hash)
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
async def _afetch_agent_card_impl(
endpoint: str,
auth: AuthScheme | None,
timeout: int,
) -> AgentCard:
"""Internal async implementation of AgentCard fetching."""
if "/.well-known/agent-card.json" in endpoint:
base_url = endpoint.replace("/.well-known/agent-card.json", "")
agent_card_path = "/.well-known/agent-card.json"
else:
url_parts = endpoint.split("/", 3)
base_url = f"{url_parts[0]}//{url_parts[2]}"
agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/"
headers: MutableMapping[str, str] = {}
if auth:
async with httpx.AsyncClient(timeout=timeout) as temp_auth_client:
if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_auth_client)
headers = await auth.apply_auth(temp_auth_client, {})
async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client:
if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_client)
agent_card_url = f"{base_url}{agent_card_path}"
async def _fetch_agent_card_request() -> httpx.Response:
return await temp_client.get(agent_card_url)
try:
response = await retry_on_401(
request_func=_fetch_agent_card_request,
auth_scheme=auth,
client=temp_client,
headers=temp_client.headers,
max_retries=2,
)
response.raise_for_status()
return AgentCard.model_validate(response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
error_details = ["Authentication failed"]
www_auth = e.response.headers.get("WWW-Authenticate")
if www_auth:
error_details.append(f"WWW-Authenticate: {www_auth}")
if not auth:
error_details.append("No auth scheme provided")
msg = " | ".join(error_details)
raise A2AClientHTTPError(401, msg) from e
raise
def execute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
@@ -457,18 +644,19 @@ async def _create_a2a_client(
"""Create and configure an A2A client.
Args:
agent_card: The A2A agent card.
transport_protocol: Transport protocol to use.
timeout: Request timeout in seconds.
headers: HTTP headers (already with auth applied).
streaming: Enable streaming responses.
auth: Optional AuthScheme for client configuration.
use_polling: Enable polling mode.
push_notification_config: Optional push notification config.
agent_card: The A2A agent card
transport_protocol: Transport protocol to use
timeout: Request timeout in seconds
headers: HTTP headers (already with auth applied)
streaming: Enable streaming responses
auth: Optional AuthScheme for client configuration
use_polling: Enable polling mode
push_notification_config: Optional push notification config to include in requests
Yields:
Configured A2A client instance.
Configured A2A client instance
"""
async with httpx.AsyncClient(
timeout=timeout,
headers=headers,
@@ -499,3 +687,78 @@ async def _create_a2a_client(
factory = ClientFactory(config)
client = factory.create(agent_card)
yield client
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]:
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
Args:
agent_ids: List of available A2A agent IDs
Returns:
Dynamically created Pydantic model with Literal-constrained a2a_ids field
"""
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
return create_model(
"AgentResponse",
a2a_ids=(
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
Field(
default_factory=tuple,
max_length=len(agent_ids),
description="A2A agent IDs to delegate to.",
),
),
message=(
str,
Field(
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
),
),
is_a2a=(
bool,
Field(
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
),
),
__base__=BaseModel,
)
def extract_a2a_agent_ids_from_config(
a2a_config: list[A2AConfig] | A2AConfig | None,
) -> tuple[list[A2AConfig], tuple[str, ...]]:
"""Extract A2A agent IDs from A2A configuration.
Args:
a2a_config: A2A configuration
Returns:
List of A2A agent IDs
"""
if a2a_config is None:
return [], ()
if isinstance(a2a_config, A2AConfig):
a2a_agents = [a2a_config]
else:
a2a_agents = a2a_config
return a2a_agents, tuple(config.endpoint for config in a2a_agents)
def get_a2a_agents_and_response_model(
a2a_config: list[A2AConfig] | A2AConfig | None,
) -> tuple[list[A2AConfig], type[BaseModel]]:
"""Get A2A agent IDs and response model.
Args:
a2a_config: A2A configuration
Returns:
Tuple of A2A agent IDs and response model
"""
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
return a2a_agents, create_agent_response_model(agent_ids)

View File

@@ -1 +0,0 @@
"""A2A utility modules for client operations."""

View File

@@ -1,399 +0,0 @@
"""AgentCard utilities for A2A client and server operations."""
from __future__ import annotations
import asyncio
from collections.abc import MutableMapping
from functools import lru_cache
import time
from types import MethodType
from typing import TYPE_CHECKING
from a2a.client.errors import A2AClientHTTPError
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
import httpx
from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth
from crewai.a2a.auth.utils import (
_auth_store,
configure_auth_client,
retry_on_401,
)
from crewai.a2a.config import A2AServerConfig
from crewai.crew import Crew
if TYPE_CHECKING:
from crewai.a2a.auth.schemas import AuthScheme
from crewai.agent import Agent
from crewai.task import Task
def _get_server_config(agent: Agent) -> A2AServerConfig | None:
"""Get A2AServerConfig from an agent's a2a configuration.
Args:
agent: The Agent instance to check.
Returns:
A2AServerConfig if present, None otherwise.
"""
if agent.a2a is None:
return None
if isinstance(agent.a2a, A2AServerConfig):
return agent.a2a
if isinstance(agent.a2a, list):
for config in agent.a2a:
if isinstance(config, A2AServerConfig):
return config
return None
def fetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
cache_ttl: int = 300,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint with optional caching.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL).
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
use_cache: Whether to use caching (default True).
cache_ttl: Cache TTL in seconds (default 300 = 5 minutes).
Returns:
AgentCard object with agent capabilities and skills.
Raises:
httpx.HTTPStatusError: If the request fails.
A2AClientHTTPError: If authentication fails.
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
ttl_hash = int(time.time() // cache_ttl)
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
async def afetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint asynchronously.
Native async implementation. Use this when running in an async context.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL).
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
use_cache: Whether to use caching (default True).
Returns:
AgentCard object with agent capabilities and skills.
Raises:
httpx.HTTPStatusError: If the request fails.
A2AClientHTTPError: If authentication fails.
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
agent_card: AgentCard = await _afetch_agent_card_cached(
endpoint, auth_hash, timeout
)
return agent_card
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
@lru_cache()
def _fetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
_ttl_hash: int,
) -> AgentCard:
"""Cached sync version of fetch_agent_card."""
auth = _auth_store.get(auth_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
_afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator]
async def _afetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
) -> AgentCard:
"""Cached async implementation of AgentCard fetching."""
auth = _auth_store.get(auth_hash)
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
async def _afetch_agent_card_impl(
endpoint: str,
auth: AuthScheme | None,
timeout: int,
) -> AgentCard:
"""Internal async implementation of AgentCard fetching."""
if "/.well-known/agent-card.json" in endpoint:
base_url = endpoint.replace("/.well-known/agent-card.json", "")
agent_card_path = "/.well-known/agent-card.json"
else:
url_parts = endpoint.split("/", 3)
base_url = f"{url_parts[0]}//{url_parts[2]}"
agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/"
headers: MutableMapping[str, str] = {}
if auth:
async with httpx.AsyncClient(timeout=timeout) as temp_auth_client:
if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_auth_client)
headers = await auth.apply_auth(temp_auth_client, {})
async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client:
if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_client)
agent_card_url = f"{base_url}{agent_card_path}"
async def _fetch_agent_card_request() -> httpx.Response:
return await temp_client.get(agent_card_url)
try:
response = await retry_on_401(
request_func=_fetch_agent_card_request,
auth_scheme=auth,
client=temp_client,
headers=temp_client.headers,
max_retries=2,
)
response.raise_for_status()
return AgentCard.model_validate(response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
error_details = ["Authentication failed"]
www_auth = e.response.headers.get("WWW-Authenticate")
if www_auth:
error_details.append(f"WWW-Authenticate: {www_auth}")
if not auth:
error_details.append("No auth scheme provided")
msg = " | ".join(error_details)
raise A2AClientHTTPError(401, msg) from e
raise
def _task_to_skill(task: Task) -> AgentSkill:
"""Convert a CrewAI Task to an A2A AgentSkill.
Args:
task: The CrewAI Task to convert.
Returns:
AgentSkill representing the task's capability.
"""
task_name = task.name or task.description[:50]
task_id = task_name.lower().replace(" ", "_")
tags: list[str] = []
if task.agent:
tags.append(task.agent.role.lower().replace(" ", "-"))
return AgentSkill(
id=task_id,
name=task_name,
description=task.description,
tags=tags,
examples=[task.expected_output] if task.expected_output else None,
)
def _tool_to_skill(tool_name: str, tool_description: str) -> AgentSkill:
"""Convert an Agent's tool to an A2A AgentSkill.
Args:
tool_name: Name of the tool.
tool_description: Description of what the tool does.
Returns:
AgentSkill representing the tool's capability.
"""
tool_id = tool_name.lower().replace(" ", "_")
return AgentSkill(
id=tool_id,
name=tool_name,
description=tool_description,
tags=[tool_name.lower().replace(" ", "-")],
)
def _crew_to_agent_card(crew: Crew, url: str) -> AgentCard:
"""Generate an A2A AgentCard from a Crew instance.
Args:
crew: The Crew instance to generate a card for.
url: The base URL where this crew will be exposed.
Returns:
AgentCard describing the crew's capabilities.
"""
crew_name = getattr(crew, "name", None) or crew.__class__.__name__
description_parts: list[str] = []
crew_description = getattr(crew, "description", None)
if crew_description:
description_parts.append(crew_description)
else:
agent_roles = [agent.role for agent in crew.agents]
description_parts.append(
f"A crew of {len(crew.agents)} agents: {', '.join(agent_roles)}"
)
skills = [_task_to_skill(task) for task in crew.tasks]
return AgentCard(
name=crew_name,
description=" ".join(description_parts),
url=url,
version="1.0.0",
capabilities=AgentCapabilities(
streaming=True,
push_notifications=True,
),
default_input_modes=["text/plain", "application/json"],
default_output_modes=["text/plain", "application/json"],
skills=skills,
)
def _agent_to_agent_card(agent: Agent, url: str) -> AgentCard:
"""Generate an A2A AgentCard from an Agent instance.
Uses A2AServerConfig values when available, falling back to agent properties.
Args:
agent: The Agent instance to generate a card for.
url: The base URL where this agent will be exposed.
Returns:
AgentCard describing the agent's capabilities.
"""
server_config = _get_server_config(agent) or A2AServerConfig()
name = server_config.name or agent.role
description_parts = [agent.goal]
if agent.backstory:
description_parts.append(agent.backstory)
description = server_config.description or " ".join(description_parts)
skills: list[AgentSkill] = (
server_config.skills.copy() if server_config.skills else []
)
if not skills:
if agent.tools:
for tool in agent.tools:
tool_name = getattr(tool, "name", None) or tool.__class__.__name__
tool_desc = getattr(tool, "description", None) or f"Tool: {tool_name}"
skills.append(_tool_to_skill(tool_name, tool_desc))
if not skills:
skills.append(
AgentSkill(
id=agent.role.lower().replace(" ", "_"),
name=agent.role,
description=agent.goal,
tags=[agent.role.lower().replace(" ", "-")],
)
)
return AgentCard(
name=name,
description=description,
url=server_config.url or url,
version=server_config.version,
capabilities=server_config.capabilities,
default_input_modes=server_config.default_input_modes,
default_output_modes=server_config.default_output_modes,
skills=skills,
protocol_version=server_config.protocol_version,
provider=server_config.provider,
documentation_url=server_config.documentation_url,
icon_url=server_config.icon_url,
additional_interfaces=server_config.additional_interfaces,
security=server_config.security,
security_schemes=server_config.security_schemes,
supports_authenticated_extended_card=server_config.supports_authenticated_extended_card,
signatures=server_config.signatures,
)
def inject_a2a_server_methods(agent: Agent) -> None:
"""Inject A2A server methods onto an Agent instance.
Adds a `to_agent_card(url: str) -> AgentCard` method to the agent
that generates an A2A-compliant AgentCard.
Only injects if the agent has an A2AServerConfig.
Args:
agent: The Agent instance to inject methods onto.
"""
if _get_server_config(agent) is None:
return
def _to_agent_card(self: Agent, url: str) -> AgentCard:
return _agent_to_agent_card(self, url)
object.__setattr__(agent, "to_agent_card", MethodType(_to_agent_card, agent))

View File

@@ -1,101 +0,0 @@
"""Response model utilities for A2A agent interactions."""
from __future__ import annotations
from typing import TypeAlias
from pydantic import BaseModel, Field, create_model
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
from crewai.types.utils import create_literals_from_strings
A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig
A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel] | None:
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
Args:
agent_ids: List of available A2A agent IDs.
Returns:
Dynamically created Pydantic model with Literal-constrained a2a_ids field,
or None if agent_ids is empty.
"""
if not agent_ids:
return None
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
return create_model(
"AgentResponse",
a2a_ids=(
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
Field(
default_factory=tuple,
max_length=len(agent_ids),
description="A2A agent IDs to delegate to.",
),
),
message=(
str,
Field(
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
),
),
is_a2a=(
bool,
Field(
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
),
),
__base__=BaseModel,
)
def extract_a2a_agent_ids_from_config(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]:
"""Extract A2A agent IDs from A2A configuration.
Filters out A2AServerConfig since it doesn't have an endpoint for delegation.
Args:
a2a_config: A2A configuration (any type).
Returns:
Tuple of client A2A configs list and agent endpoint IDs.
"""
if a2a_config is None:
return [], ()
configs: list[A2AConfigTypes]
if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)):
configs = [a2a_config]
else:
configs = a2a_config
# Filter to only client configs (those with endpoint)
client_configs: list[A2AClientConfigTypes] = [
config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig))
]
return client_configs, tuple(config.endpoint for config in client_configs)
def get_a2a_agents_and_response_model(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
) -> tuple[list[A2AClientConfigTypes], type[BaseModel] | None]:
"""Get A2A agent configs and response model.
Args:
a2a_config: A2A configuration (any type).
Returns:
Tuple of client A2A configs and response model.
"""
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
return a2a_agents, create_agent_response_model(agent_ids)

View File

@@ -1,284 +0,0 @@
"""A2A task utilities for server-side task management."""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
from functools import wraps
import logging
import os
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
from a2a.server.agent_execution import RequestContext
from a2a.server.events import EventQueue
from a2a.types import (
InternalError,
InvalidParamsError,
Message,
Task as A2ATask,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
)
from a2a.utils import new_agent_text_message, new_text_artifact
from a2a.utils.errors import ServerError
from aiocache import SimpleMemoryCache, caches # type: ignore[import-untyped]
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AServerTaskCanceledEvent,
A2AServerTaskCompletedEvent,
A2AServerTaskFailedEvent,
A2AServerTaskStartedEvent,
)
from crewai.task import Task
if TYPE_CHECKING:
from crewai.agent import Agent
logger = logging.getLogger(__name__)
P = ParamSpec("P")
T = TypeVar("T")
def _parse_redis_url(url: str) -> dict[str, Any]:
from urllib.parse import urlparse
parsed = urlparse(url)
config: dict[str, Any] = {
"cache": "aiocache.RedisCache",
"endpoint": parsed.hostname or "localhost",
"port": parsed.port or 6379,
}
if parsed.path and parsed.path != "/":
try:
config["db"] = int(parsed.path.lstrip("/"))
except ValueError:
pass
if parsed.password:
config["password"] = parsed.password
return config
_redis_url = os.environ.get("REDIS_URL")
caches.set_config(
{
"default": _parse_redis_url(_redis_url)
if _redis_url
else {
"cache": "aiocache.SimpleMemoryCache",
}
}
)
def cancellable(
fn: Callable[P, Coroutine[Any, Any, T]],
) -> Callable[P, Coroutine[Any, Any, T]]:
"""Decorator that enables cancellation for A2A task execution.
Runs a cancellation watcher concurrently with the wrapped function.
When a cancel event is published, the execution is cancelled.
Args:
fn: The async function to wrap.
Returns:
Wrapped function with cancellation support.
"""
@wraps(fn)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
"""Wrap function with cancellation monitoring."""
context: RequestContext | None = None
for arg in args:
if isinstance(arg, RequestContext):
context = arg
break
if context is None:
context = cast(RequestContext | None, kwargs.get("context"))
if context is None:
return await fn(*args, **kwargs)
task_id = context.task_id
cache = caches.get("default")
async def poll_for_cancel() -> bool:
"""Poll cache for cancellation flag."""
while True:
if await cache.get(f"cancel:{task_id}"):
return True
await asyncio.sleep(0.1)
async def watch_for_cancel() -> bool:
"""Watch for cancellation events via pub/sub or polling."""
if isinstance(cache, SimpleMemoryCache):
return await poll_for_cancel()
try:
client = cache.client
pubsub = client.pubsub()
await pubsub.subscribe(f"cancel:{task_id}")
async for message in pubsub.listen():
if message["type"] == "message":
return True
except Exception as e:
logger.warning("Cancel watcher error for task_id=%s: %s", task_id, e)
return await poll_for_cancel()
return False
execute_task = asyncio.create_task(fn(*args, **kwargs))
cancel_watch = asyncio.create_task(watch_for_cancel())
try:
done, _ = await asyncio.wait(
[execute_task, cancel_watch],
return_when=asyncio.FIRST_COMPLETED,
)
if cancel_watch in done:
execute_task.cancel()
try:
await execute_task
except asyncio.CancelledError:
pass
raise asyncio.CancelledError(f"Task {task_id} was cancelled")
cancel_watch.cancel()
return execute_task.result()
finally:
await cache.delete(f"cancel:{task_id}")
return wrapper
@cancellable
async def execute(
agent: Agent,
context: RequestContext,
event_queue: EventQueue,
) -> None:
"""Execute an A2A task using a CrewAI agent.
Args:
agent: The CrewAI agent to execute the task.
context: The A2A request context containing the user's message.
event_queue: The event queue for sending responses back.
TODOs:
* need to impl both of structured output and file inputs, depends on `file_inputs` for
`crewai.task.Task`, pass the below two to Task. both utils in `a2a.utils.parts`
* structured outputs ingestion, `structured_inputs = get_data_parts(parts=context.message.parts)`
* file inputs ingestion, `file_inputs = get_file_parts(parts=context.message.parts)`
"""
user_message = context.get_user_input()
task_id = context.task_id
context_id = context.context_id
if task_id is None or context_id is None:
msg = "task_id and context_id are required"
crewai_event_bus.emit(
agent,
A2AServerTaskFailedEvent(a2a_task_id="", a2a_context_id="", error=msg),
)
raise ServerError(InvalidParamsError(message=msg)) from None
task = Task(
description=user_message,
expected_output="Response to the user's request",
agent=agent,
)
crewai_event_bus.emit(
agent,
A2AServerTaskStartedEvent(a2a_task_id=task_id, a2a_context_id=context_id),
)
try:
result = await agent.aexecute_task(task=task, tools=agent.tools)
result_str = str(result)
history: list[Message] = [context.message] if context.message else []
history.append(new_agent_text_message(result_str, context_id, task_id))
await event_queue.enqueue_event(
A2ATask(
id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.input_required),
artifacts=[new_text_artifact(result_str, f"result_{task_id}")],
history=history,
)
)
crewai_event_bus.emit(
agent,
A2AServerTaskCompletedEvent(
a2a_task_id=task_id, a2a_context_id=context_id, result=str(result)
),
)
except asyncio.CancelledError:
crewai_event_bus.emit(
agent,
A2AServerTaskCanceledEvent(a2a_task_id=task_id, a2a_context_id=context_id),
)
raise
except Exception as e:
crewai_event_bus.emit(
agent,
A2AServerTaskFailedEvent(
a2a_task_id=task_id, a2a_context_id=context_id, error=str(e)
),
)
raise ServerError(
error=InternalError(message=f"Task execution failed: {e}")
) from e
async def cancel(
context: RequestContext,
event_queue: EventQueue,
) -> A2ATask | None:
"""Cancel an A2A task.
Publishes a cancel event that the cancellable decorator listens for.
Args:
context: The A2A request context containing task information.
event_queue: The event queue for sending the cancellation status.
Returns:
The canceled task with updated status.
"""
task_id = context.task_id
context_id = context.context_id
if task_id is None or context_id is None:
raise ServerError(InvalidParamsError(message="task_id and context_id required"))
if context.current_task and context.current_task.status.state in (
TaskState.completed,
TaskState.failed,
TaskState.canceled,
):
return context.current_task
cache = caches.get("default")
await cache.set(f"cancel:{task_id}", True, ttl=3600)
if not isinstance(cache, SimpleMemoryCache):
await cache.client.publish(f"cancel:{task_id}", "cancel")
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.canceled),
final=True,
)
)
if context.current_task:
context.current_task.status = TaskStatus(state=TaskState.canceled)
return context.current_task
return None

View File

@@ -15,7 +15,7 @@ from typing import TYPE_CHECKING, Any
from a2a.types import Role, TaskState
from pydantic import BaseModel, ValidationError
from crewai.a2a.config import A2AClientConfig, A2AConfig
from crewai.a2a.config import A2AConfig
from crewai.a2a.extensions.base import ExtensionRegistry
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.templates import (
@@ -26,16 +26,13 @@ from crewai.a2a.templates import (
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE,
)
from crewai.a2a.types import AgentResponseProtocol
from crewai.a2a.utils.agent_card import (
afetch_agent_card,
fetch_agent_card,
inject_a2a_server_methods,
)
from crewai.a2a.utils.delegation import (
from crewai.a2a.utils import (
aexecute_a2a_delegation,
afetch_agent_card,
execute_a2a_delegation,
fetch_agent_card,
get_a2a_agents_and_response_model,
)
from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AConversationCompletedEvent,
@@ -125,12 +122,10 @@ def wrap_agent_with_a2a_instance(
agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent)
)
inject_a2a_server_methods(agent)
def _fetch_card_from_config(
config: A2AConfig | A2AClientConfig,
) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]:
config: A2AConfig,
) -> tuple[A2AConfig, AgentCard | Exception]:
"""Fetch agent card from A2A config.
Args:
@@ -151,7 +146,7 @@ def _fetch_card_from_config(
def _fetch_agent_cards_concurrently(
a2a_agents: list[A2AConfig | A2AClientConfig],
a2a_agents: list[A2AConfig],
) -> tuple[dict[str, AgentCard], dict[str, str]]:
"""Fetch agent cards concurrently for multiple A2A agents.
@@ -186,7 +181,7 @@ def _fetch_agent_cards_concurrently(
def _execute_task_with_a2a(
self: Agent,
a2a_agents: list[A2AConfig | A2AClientConfig],
a2a_agents: list[A2AConfig],
original_fn: Callable[..., str],
task: Task,
agent_response_model: type[BaseModel],
@@ -275,7 +270,7 @@ def _execute_task_with_a2a(
def _augment_prompt_with_a2a(
a2a_agents: list[A2AConfig | A2AClientConfig],
a2a_agents: list[A2AConfig],
task_description: str,
agent_cards: dict[str, AgentCard],
conversation_history: list[Message] | None = None,
@@ -528,11 +523,11 @@ def _prepare_delegation_context(
task: Task,
original_task_description: str | None,
) -> tuple[
list[A2AConfig | A2AClientConfig],
list[A2AConfig],
type[BaseModel],
str,
str,
A2AConfig | A2AClientConfig,
A2AConfig,
str | None,
str | None,
dict[str, Any] | None,
@@ -596,7 +591,7 @@ def _handle_task_completion(
task: Task,
task_id_config: str | None,
reference_task_ids: list[str],
agent_config: A2AConfig | A2AClientConfig,
agent_config: A2AConfig,
turn_num: int,
) -> tuple[str | None, str | None, list[str]]:
"""Handle task completion state including reference task updates.
@@ -636,7 +631,7 @@ def _handle_agent_response_and_continue(
a2a_result: TaskStateResult,
agent_id: str,
agent_cards: dict[str, AgentCard] | None,
a2a_agents: list[A2AConfig | A2AClientConfig],
a2a_agents: list[A2AConfig],
original_task_description: str,
conversation_history: list[Message],
turn_num: int,
@@ -873,8 +868,8 @@ def _delegate_to_a2a(
async def _afetch_card_from_config(
config: A2AConfig | A2AClientConfig,
) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]:
config: A2AConfig,
) -> tuple[A2AConfig, AgentCard | Exception]:
"""Fetch agent card from A2A config asynchronously."""
try:
card = await afetch_agent_card(
@@ -888,7 +883,7 @@ async def _afetch_card_from_config(
async def _afetch_agent_cards_concurrently(
a2a_agents: list[A2AConfig | A2AClientConfig],
a2a_agents: list[A2AConfig],
) -> tuple[dict[str, AgentCard], dict[str, str]]:
"""Fetch agent cards concurrently for multiple A2A agents using asyncio."""
agent_cards: dict[str, AgentCard] = {}
@@ -913,7 +908,7 @@ async def _afetch_agent_cards_concurrently(
async def _aexecute_task_with_a2a(
self: Agent,
a2a_agents: list[A2AConfig | A2AClientConfig],
a2a_agents: list[A2AConfig],
original_fn: Callable[..., Coroutine[Any, Any, str]],
task: Task,
agent_response_model: type[BaseModel],
@@ -992,7 +987,7 @@ async def _ahandle_agent_response_and_continue(
a2a_result: TaskStateResult,
agent_id: str,
agent_cards: dict[str, AgentCard] | None,
a2a_agents: list[A2AConfig | A2AClientConfig],
a2a_agents: list[A2AConfig],
original_task_description: str,
conversation_history: list[Message],
turn_num: int,

View File

@@ -17,6 +17,7 @@ from urllib.parse import urlparse
from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator
from typing_extensions import Self
from crewai.a2a.config import A2AConfig
from crewai.agent.utils import (
ahandle_knowledge_retrieval,
apply_training_data,
@@ -72,19 +73,11 @@ from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_F
from crewai.utilities.converter import Converter
from crewai.utilities.guardrail_types import GuardrailType
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.prompts import Prompts, StandardPromptResult, SystemPromptResult
from crewai.utilities.prompts import Prompts
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
try:
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
except ImportError:
A2AClientConfig = Any
A2AConfig = Any
A2AServerConfig = Any
if TYPE_CHECKING:
from crewai_tools import CodeInterpreterTool
@@ -225,18 +218,9 @@ class Agent(BaseAgent):
guardrail_max_retries: int = Field(
default=3, description="Maximum number of retries when guardrail fails"
)
a2a: (
list[A2AConfig | A2AServerConfig | A2AClientConfig]
| A2AConfig
| A2AServerConfig
| A2AClientConfig
| None
) = Field(
a2a: list[A2AConfig] | A2AConfig | None = Field(
default=None,
description="""
A2A (Agent-to-Agent) configuration for delegating tasks to remote agents.
Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig.
""",
description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.",
)
executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field(
default=CrewAgentExecutor,
@@ -749,7 +733,7 @@ class Agent(BaseAgent):
if self.agent_executor is not None:
self._update_executor_parameters(
task=task,
tools=parsed_tools, # type: ignore[arg-type]
tools=parsed_tools,
raw_tools=raw_tools,
prompt=prompt,
stop_words=stop_words,
@@ -758,7 +742,7 @@ class Agent(BaseAgent):
else:
self.agent_executor = self.executor_class(
llm=cast(BaseLLM, self.llm),
task=task, # type: ignore[arg-type]
task=task,
i18n=self.i18n,
agent=self,
crew=self.crew,
@@ -781,11 +765,11 @@ class Agent(BaseAgent):
def _update_executor_parameters(
self,
task: Task | None,
tools: list[BaseTool],
tools: list,
raw_tools: list[BaseTool],
prompt: SystemPromptResult | StandardPromptResult,
prompt: dict,
stop_words: list[str],
rpm_limit_fn: Callable | None, # type: ignore[type-arg]
rpm_limit_fn: Callable | None,
) -> None:
"""Update executor parameters without recreating instance.

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.8.1"
"crewai[tools]==1.8.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.8.1"
"crewai[tools]==1.8.0"
]
[project.scripts]

View File

@@ -1,20 +1,3 @@
from crewai.events.types.a2a_events import (
A2AConversationCompletedEvent,
A2AConversationStartedEvent,
A2ADelegationCompletedEvent,
A2ADelegationStartedEvent,
A2AMessageSentEvent,
A2APollingStartedEvent,
A2APollingStatusEvent,
A2APushNotificationReceivedEvent,
A2APushNotificationRegisteredEvent,
A2APushNotificationTimeoutEvent,
A2AResponseReceivedEvent,
A2AServerTaskCanceledEvent,
A2AServerTaskCompletedEvent,
A2AServerTaskFailedEvent,
A2AServerTaskStartedEvent,
)
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
@@ -93,22 +76,7 @@ from crewai.events.types.tool_usage_events import (
EventTypes = (
A2AConversationCompletedEvent
| A2AConversationStartedEvent
| A2ADelegationCompletedEvent
| A2ADelegationStartedEvent
| A2AMessageSentEvent
| A2APollingStartedEvent
| A2APollingStatusEvent
| A2APushNotificationReceivedEvent
| A2APushNotificationRegisteredEvent
| A2APushNotificationTimeoutEvent
| A2AResponseReceivedEvent
| A2AServerTaskCanceledEvent
| A2AServerTaskCompletedEvent
| A2AServerTaskFailedEvent
| A2AServerTaskStartedEvent
| CrewKickoffStartedEvent
CrewKickoffStartedEvent
| CrewKickoffCompletedEvent
| CrewKickoffFailedEvent
| CrewTestStartedEvent

View File

@@ -210,37 +210,3 @@ class A2APushNotificationTimeoutEvent(A2AEventBase):
type: str = "a2a_push_notification_timeout"
task_id: str
timeout_seconds: float
class A2AServerTaskStartedEvent(A2AEventBase):
"""Event emitted when an A2A server task execution starts."""
type: str = "a2a_server_task_started"
a2a_task_id: str
a2a_context_id: str
class A2AServerTaskCompletedEvent(A2AEventBase):
"""Event emitted when an A2A server task execution completes."""
type: str = "a2a_server_task_completed"
a2a_task_id: str
a2a_context_id: str
result: str
class A2AServerTaskCanceledEvent(A2AEventBase):
"""Event emitted when an A2A server task execution is canceled."""
type: str = "a2a_server_task_canceled"
a2a_task_id: str
a2a_context_id: str
class A2AServerTaskFailedEvent(A2AEventBase):
"""Event emitted when an A2A server task execution fails."""
type: str = "a2a_server_task_failed"
a2a_task_id: str
a2a_context_id: str
error: str

View File

@@ -1,4 +1,12 @@
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.experimental.environment_tools import (
BaseEnvironmentTool,
EnvironmentTools,
FileReadTool,
FileSearchTool,
GrepTool,
ListDirTool,
)
from crewai.experimental.evaluation import (
AgentEvaluationResult,
AgentEvaluator,
@@ -23,14 +31,20 @@ from crewai.experimental.evaluation import (
__all__ = [
"AgentEvaluationResult",
"AgentEvaluator",
"BaseEnvironmentTool",
"BaseEvaluator",
"CrewAgentExecutorFlow",
"EnvironmentTools",
"EvaluationScore",
"EvaluationTraceCallback",
"ExperimentResult",
"ExperimentResults",
"ExperimentRunner",
"FileReadTool",
"FileSearchTool",
"GoalAlignmentEvaluator",
"GrepTool",
"ListDirTool",
"MetricCategory",
"ParameterExtractionEvaluator",
"ReasoningEfficiencyEvaluator",

View File

@@ -0,0 +1,24 @@
"""Environment tools for file system operations.
These tools provide agents with the ability to explore and read from
the filesystem for context engineering purposes.
"""
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
from crewai.experimental.environment_tools.environment_tools import EnvironmentTools
from crewai.experimental.environment_tools.file_read_tool import FileReadTool
from crewai.experimental.environment_tools.file_search_tool import FileSearchTool
from crewai.experimental.environment_tools.grep_tool import GrepTool
from crewai.experimental.environment_tools.list_dir_tool import ListDirTool
__all__ = [
"BaseEnvironmentTool",
"EnvironmentTools",
"FileReadTool",
"FileSearchTool",
"GrepTool",
"ListDirTool",
]

View File

@@ -0,0 +1,84 @@
"""Base class for environment tools with path security."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from pydantic import Field
from crewai.tools.base_tool import BaseTool
class BaseEnvironmentTool(BaseTool):
"""Base class for environment/file system tools with path security.
Provides path validation to restrict file operations to allowed directories.
This prevents path traversal attacks and enforces security sandboxing.
Attributes:
allowed_paths: List of paths that operations are restricted to.
Empty list means allow all paths (no restrictions).
"""
allowed_paths: list[str] = Field(
default_factory=lambda: ["."],
description="Restrict operations to these paths. Defaults to current directory.",
)
def _validate_path(self, path: str) -> tuple[bool, Path | str]:
"""Validate and resolve a path against allowed_paths whitelist.
Args:
path: The path to validate.
Returns:
A tuple of (is_valid, result) where:
- If valid: (True, resolved_path as Path)
- If invalid: (False, error_message as str)
"""
try:
resolved = Path(path).resolve()
# If no restrictions, allow all paths
if not self.allowed_paths:
return True, resolved
# Check if path is within any allowed path
for allowed in self.allowed_paths:
allowed_resolved = Path(allowed).resolve()
try:
# This will raise ValueError if resolved is not relative to allowed_resolved
resolved.relative_to(allowed_resolved)
return True, resolved
except ValueError:
continue
return (
False,
f"Path '{path}' is outside allowed paths: {self.allowed_paths}",
)
except Exception as e:
return False, f"Invalid path '{path}': {e}"
def _format_size(self, size: int) -> str:
"""Format file size in human-readable format.
Args:
size: Size in bytes.
Returns:
Human-readable size string (e.g., "1.5KB", "2.3MB").
"""
if size < 1024:
return f"{size}B"
if size < 1024 * 1024:
return f"{size / 1024:.1f}KB"
if size < 1024 * 1024 * 1024:
return f"{size / (1024 * 1024):.1f}MB"
return f"{size / (1024 * 1024 * 1024):.1f}GB"
def _run(self, *args: Any, **kwargs: Any) -> Any:
"""Subclasses must implement this method."""
raise NotImplementedError("Subclasses must implement _run method")

View File

@@ -0,0 +1,77 @@
"""Manager class for environment tools."""
from __future__ import annotations
from typing import TYPE_CHECKING
from crewai.experimental.environment_tools.file_read_tool import FileReadTool
from crewai.experimental.environment_tools.file_search_tool import FileSearchTool
from crewai.experimental.environment_tools.grep_tool import GrepTool
from crewai.experimental.environment_tools.list_dir_tool import ListDirTool
if TYPE_CHECKING:
from crewai.tools.base_tool import BaseTool
class EnvironmentTools:
"""Manager class for file system/environment tools.
Provides a convenient way to create a set of file system tools
with shared security configuration (allowed_paths).
Similar to AgentTools but for file system operations. Use this to
give agents the ability to explore and read files for context engineering.
Example:
from crewai.experimental import EnvironmentTools
# Create tools with security sandbox
env_tools = EnvironmentTools(
allowed_paths=["./src", "./docs"],
)
# Use with an agent
agent = Agent(
role="Code Analyst",
tools=env_tools.tools(),
)
"""
def __init__(
self,
allowed_paths: list[str] | None = None,
include_grep: bool = True,
include_search: bool = True,
) -> None:
"""Initialize EnvironmentTools.
Args:
allowed_paths: List of paths to restrict operations to.
Defaults to current directory ["."] if None.
Pass empty list [] to allow all paths (not recommended).
include_grep: Whether to include GrepTool (requires grep installed).
include_search: Whether to include FileSearchTool.
"""
self.allowed_paths = allowed_paths if allowed_paths is not None else ["."]
self.include_grep = include_grep
self.include_search = include_search
def tools(self) -> list[BaseTool]:
"""Get all configured environment tools.
Returns:
List of BaseTool instances with shared allowed_paths configuration.
"""
tool_list: list[BaseTool] = [
FileReadTool(allowed_paths=self.allowed_paths),
ListDirTool(allowed_paths=self.allowed_paths),
]
if self.include_grep:
tool_list.append(GrepTool(allowed_paths=self.allowed_paths))
if self.include_search:
tool_list.append(FileSearchTool(allowed_paths=self.allowed_paths))
return tool_list

View File

@@ -0,0 +1,124 @@
"""Tool for reading file contents."""
from __future__ import annotations
from pathlib import Path
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class FileReadInput(BaseModel):
"""Input schema for reading files."""
path: str = Field(..., description="Path to the file to read")
start_line: int | None = Field(
default=None,
description="Line to start reading from (1-indexed). If None, starts from beginning.",
)
line_count: int | None = Field(
default=None,
description="Number of lines to read. If None, reads to end of file.",
)
class FileReadTool(BaseEnvironmentTool):
"""Read contents of text files with optional line ranges.
Use this tool to:
- Read configuration files, source code, logs
- Inspect file contents before making decisions
- Load reference documentation or data files
Supports reading entire files or specific line ranges for efficiency.
"""
name: str = "read_file"
description: str = """Read the contents of a text file.
Use this to read configuration files, source code, logs, or any text file.
You can optionally specify start_line and line_count to read specific portions.
Examples:
- Read entire file: path="config.yaml"
- Read lines 100-149: path="large.log", start_line=100, line_count=50
"""
args_schema: type[BaseModel] = FileReadInput
def _run(
self,
path: str,
start_line: int | None = None,
line_count: int | None = None,
) -> str:
"""Read file contents with optional line range.
Args:
path: Path to the file to read.
start_line: Line to start reading from (1-indexed).
line_count: Number of lines to read.
Returns:
File contents with metadata header, or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
assert isinstance(result, Path) # noqa: S101
file_path = result
# Check file exists and is a file
if not file_path.exists():
return f"Error: File not found: {path}"
if not file_path.is_file():
return f"Error: Not a file: {path}"
try:
with open(file_path, "r", encoding="utf-8") as f:
if start_line is None and line_count is None:
# Read entire file
content = f.read()
else:
# Read specific line range
lines = f.readlines()
start_idx = (start_line or 1) - 1 # Convert to 0-indexed
start_idx = max(0, start_idx) # Ensure non-negative
if line_count is not None:
end_idx = start_idx + line_count
else:
end_idx = len(lines)
content = "".join(lines[start_idx:end_idx])
# Get file metadata
stat = file_path.stat()
total_lines = content.count("\n") + (
1 if content and not content.endswith("\n") else 0
)
# Format output with metadata header
header = f"File: {path}\n"
header += f"Size: {self._format_size(stat.st_size)} | Lines: {total_lines}"
if start_line is not None or line_count is not None:
header += (
f" | Range: {start_line or 1}-{(start_line or 1) + total_lines - 1}"
)
header += "\n" + "=" * 60 + "\n"
return header + content
except UnicodeDecodeError:
return f"Error: File is not a text file or has encoding issues: {path}"
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error reading file: {e}"

View File

@@ -0,0 +1,127 @@
"""Tool for finding files by name pattern."""
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class FileSearchInput(BaseModel):
"""Input schema for file search."""
pattern: str = Field(
...,
description="Filename pattern to search for (glob syntax, e.g., '*.py', 'test_*.py')",
)
path: str = Field(
default=".",
description="Directory to search in",
)
file_type: Literal["file", "dir", "all"] | None = Field(
default="all",
description="Filter by type: 'file' for files only, 'dir' for directories only, 'all' for both",
)
class FileSearchTool(BaseEnvironmentTool):
"""Find files by name pattern.
Use this tool to:
- Find specific files in a codebase
- Locate configuration files
- Search for files matching a pattern
"""
name: str = "find_files"
description: str = """Find files by name pattern using glob syntax.
Searches recursively through directories to find matching files.
Examples:
- Find Python files: pattern="*.py", path="src/"
- Find test files: pattern="test_*.py"
- Find configs: pattern="*.yaml", path="."
- Find directories only: pattern="*", file_type="dir"
"""
args_schema: type[BaseModel] = FileSearchInput
def _run(
self,
pattern: str,
path: str = ".",
file_type: Literal["file", "dir", "all"] | None = "all",
) -> str:
"""Find files matching a pattern.
Args:
pattern: Glob pattern for filenames.
path: Directory to search in.
file_type: Filter by type ('file', 'dir', or 'all').
Returns:
List of matching files or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
search_path = result
# Check directory exists
if not search_path.exists():
return f"Error: Directory not found: {path}"
if not search_path.is_dir():
return f"Error: Not a directory: {path}"
try:
# Find matching entries recursively
matches = list(search_path.rglob(pattern))
# Filter by type
if file_type == "file":
matches = [m for m in matches if m.is_file()]
elif file_type == "dir":
matches = [m for m in matches if m.is_dir()]
# Filter out hidden files
matches = [
m for m in matches if not any(part.startswith(".") for part in m.parts)
]
# Sort alphabetically
matches.sort(key=lambda x: str(x).lower())
if not matches:
return f"No {file_type if file_type != 'all' else 'files'} matching '{pattern}' found in {path}"
# Format output
result_lines = [f"Found {len(matches)} matches for '{pattern}' in {path}:"]
result_lines.append("=" * 60)
for match in matches:
# Get relative path from search directory
rel_path = match.relative_to(search_path)
if match.is_dir():
result_lines.append(f"📁 {rel_path}/")
else:
try:
size = match.stat().st_size
except (OSError, PermissionError):
continue # Skip files we can't stat
size_str = self._format_size(size)
result_lines.append(f"📄 {rel_path} ({size_str})")
return "\n".join(result_lines)
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error searching files: {e}"

View File

@@ -0,0 +1,149 @@
"""Tool for searching patterns in files using grep."""
from __future__ import annotations
import subprocess
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class GrepInput(BaseModel):
"""Input schema for grep search."""
pattern: str = Field(..., description="Search pattern (supports regex)")
path: str = Field(..., description="File or directory to search in")
recursive: bool = Field(
default=True,
description="Search recursively in directories",
)
ignore_case: bool = Field(
default=False,
description="Case-insensitive search",
)
context_lines: int = Field(
default=2,
description="Number of context lines to show before/after matches",
)
class GrepTool(BaseEnvironmentTool):
"""Search for text patterns in files using grep.
Use this tool to:
- Find where a function or class is defined
- Search for error messages in logs
- Locate configuration values
- Find TODO comments or specific patterns
"""
name: str = "grep_search"
description: str = """Search for text patterns in files using grep.
Supports regex patterns. Returns matching lines with context.
Examples:
- Find function: pattern="def process_data", path="src/"
- Search logs: pattern="ERROR", path="logs/app.log"
- Case-insensitive: pattern="todo", path=".", ignore_case=True
"""
args_schema: type[BaseModel] = GrepInput
def _run(
self,
pattern: str,
path: str,
recursive: bool = True,
ignore_case: bool = False,
context_lines: int = 2,
) -> str:
"""Search for patterns in files.
Args:
pattern: Search pattern (regex supported).
path: File or directory to search in.
recursive: Whether to search recursively.
ignore_case: Whether to ignore case.
context_lines: Lines of context around matches.
Returns:
Search results or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
search_path = result
# Check path exists
if not search_path.exists():
return f"Error: Path not found: {path}"
try:
# Build grep command safely
cmd = ["grep", "--color=never"]
# Add recursive flag if searching directory
if recursive and search_path.is_dir():
cmd.append("-r")
# Case insensitive
if ignore_case:
cmd.append("-i")
# Context lines
if context_lines > 0:
cmd.extend(["-C", str(context_lines)])
# Show line numbers
cmd.append("-n")
# Use -- to prevent pattern from being interpreted as option
cmd.append("--")
cmd.append(pattern)
cmd.append(str(search_path))
# Execute with timeout
# Security: cmd is a list (no shell injection), path is validated above
result = subprocess.run( # noqa: S603
cmd,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0:
# Found matches
output = result.stdout
# Count actual match lines (not context lines)
match_lines = [
line
for line in output.split("\n")
if line and not line.startswith("--")
]
match_count = len(match_lines)
header = f"Found {match_count} matches for '{pattern}' in {path}\n"
header += "=" * 60 + "\n"
return header + output
if result.returncode == 1:
# No matches found (grep returns 1 for no matches)
return f"No matches found for '{pattern}' in {path}"
# Error occurred
error_msg = result.stderr.strip() if result.stderr else "Unknown error"
return f"Error: {error_msg}"
except subprocess.TimeoutExpired:
return "Error: Search timed out (>30s). Try narrowing the search path."
except FileNotFoundError:
return (
"Error: grep command not found. Ensure grep is installed on the system."
)
except Exception as e:
return f"Error during search: {e}"

View File

@@ -0,0 +1,147 @@
"""Tool for listing directory contents."""
from __future__ import annotations
from pathlib import Path
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class ListDirInput(BaseModel):
"""Input schema for listing directories."""
path: str = Field(default=".", description="Directory path to list")
pattern: str | None = Field(
default=None,
description="Glob pattern to filter entries (e.g., '*.py', '*.md')",
)
recursive: bool = Field(
default=False,
description="If True, list contents recursively including subdirectories",
)
class ListDirTool(BaseEnvironmentTool):
"""List contents of a directory with optional filtering.
Use this tool to:
- Explore project structure
- Find specific file types
- Check what files exist in a directory
- Navigate the file system
"""
name: str = "list_directory"
description: str = """List contents of a directory.
Use this to explore directories and find files. You can filter by pattern
and optionally list recursively.
Examples:
- List current dir: path="."
- List src folder: path="src/"
- Find Python files: path=".", pattern="*.py"
- Recursive listing: path="src/", recursive=True
"""
args_schema: type[BaseModel] = ListDirInput
def _run(
self,
path: str = ".",
pattern: str | None = None,
recursive: bool = False,
) -> str:
"""List directory contents.
Args:
path: Directory path to list.
pattern: Glob pattern to filter entries.
recursive: Whether to list recursively.
Returns:
Formatted directory listing or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
assert isinstance(result, Path) # noqa: S101
dir_path = result
# Check directory exists
if not dir_path.exists():
return f"Error: Directory not found: {path}"
if not dir_path.is_dir():
return f"Error: Not a directory: {path}"
try:
# Get entries based on pattern and recursive flag
if pattern:
if recursive:
entries = list(dir_path.rglob(pattern))
else:
entries = list(dir_path.glob(pattern))
else:
if recursive:
entries = list(dir_path.rglob("*"))
else:
entries = list(dir_path.iterdir())
# Filter out hidden files (starting with .)
entries = [e for e in entries if not e.name.startswith(".")]
# Sort: directories first, then files, alphabetically
entries.sort(key=lambda x: (not x.is_dir(), x.name.lower()))
if not entries:
if pattern:
return f"No entries matching '{pattern}' in {path}"
return f"Directory is empty: {path}"
# Format output
result_lines = [f"Contents of {path}:"]
result_lines.append("=" * 60)
dirs = []
files = []
for entry in entries:
# Get relative path for recursive listings
if recursive:
display_name = str(entry.relative_to(dir_path))
else:
display_name = entry.name
if entry.is_dir():
dirs.append(f"📁 {display_name}/")
else:
try:
size = entry.stat().st_size
except (OSError, PermissionError):
continue # Skip files we can't stat
size_str = self._format_size(size)
files.append(f"📄 {display_name} ({size_str})")
# Output directories first, then files
if dirs:
result_lines.extend(dirs)
if files:
if dirs:
result_lines.append("") # Blank line between dirs and files
result_lines.extend(files)
result_lines.append("")
result_lines.append(f"Total: {len(dirs)} directories, {len(files)} files")
return "\n".join(result_lines)
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error listing directory: {e}"

View File

@@ -1,5 +1,4 @@
import inspect
from typing import Any
from pydantic import BaseModel, Field, InstanceOf, model_validator
from typing_extensions import Self
@@ -15,14 +14,14 @@ class FlowTrackable(BaseModel):
inspecting the call stack.
"""
parent_flow: InstanceOf[Flow[Any]] | None = Field(
parent_flow: InstanceOf[Flow] | None = Field(
default=None,
description="The parent flow of the instance, if it was created inside a flow.",
)
@model_validator(mode="after")
def _set_parent_flow(self) -> Self:
max_depth = 8
max_depth = 5
frame = inspect.currentframe()
try:

View File

@@ -443,7 +443,7 @@ class AzureCompletion(BaseLLM):
params["presence_penalty"] = self.presence_penalty
if self.max_tokens is not None:
params["max_tokens"] = self.max_tokens
if self.stop and self.supports_stop_words():
if self.stop:
params["stop"] = self.stop
# Handle tools/functions for Azure OpenAI models
@@ -931,28 +931,8 @@ class AzureCompletion(BaseLLM):
return self.is_openai_model
def supports_stop_words(self) -> bool:
"""Check if the model supports stop words.
Models using the Responses API (GPT-5 family, o-series reasoning models,
computer-use-preview) do not support stop sequences.
See: https://learn.microsoft.com/en-us/azure/ai-foundry/foundry-models/concepts/models-sold-directly-by-azure
"""
model_lower = self.model.lower() if self.model else ""
if "gpt-5" in model_lower:
return False
o_series_models = ["o1", "o3", "o4", "o1-mini", "o3-mini", "o4-mini"]
responses_api_models = ["computer-use-preview"]
unsupported_stop_models = o_series_models + responses_api_models
for unsupported in unsupported_stop_models:
if unsupported in model_lower:
return False
return True
"""Check if the model supports stop words."""
return True # Most Azure models support stop sequences
def get_context_window_size(self) -> int:
"""Get the context window size for the model."""

View File

@@ -1,6 +1,8 @@
"""Utilities for creating and manipulating types."""
from typing import Annotated, Final, Literal, cast
from typing import Annotated, Final, Literal
from typing_extensions import TypeAliasType
_DYNAMIC_LITERAL_ALIAS: Final[Literal["DynamicLiteral"]] = "DynamicLiteral"
@@ -18,11 +20,6 @@ def create_literals_from_strings(
Returns:
Literal type for each A2A agent ID
Raises:
ValueError: If values is empty (Literal requires at least one value)
"""
unique_values: tuple[str, ...] = tuple(dict.fromkeys(values))
if not unique_values:
raise ValueError("Cannot create Literal type from empty values")
return cast(type, Literal.__getitem__(unique_values))
return Literal.__getitem__(unique_values)

View File

@@ -1,325 +0,0 @@
"""Tests for A2A agent card utilities."""
from __future__ import annotations
from a2a.types import AgentCard, AgentSkill
from crewai import Agent
from crewai.a2a.config import A2AClientConfig, A2AServerConfig
from crewai.a2a.utils.agent_card import inject_a2a_server_methods
class TestInjectA2AServerMethods:
"""Tests for inject_a2a_server_methods function."""
def test_agent_with_server_config_gets_to_agent_card_method(self) -> None:
"""Agent with A2AServerConfig should have to_agent_card method injected."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
assert hasattr(agent, "to_agent_card")
assert callable(agent.to_agent_card)
def test_agent_without_server_config_no_injection(self) -> None:
"""Agent without A2AServerConfig should not get to_agent_card method."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AClientConfig(endpoint="http://example.com"),
)
assert not hasattr(agent, "to_agent_card")
def test_agent_without_a2a_no_injection(self) -> None:
"""Agent without any a2a config should not get to_agent_card method."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
)
assert not hasattr(agent, "to_agent_card")
def test_agent_with_mixed_configs_gets_injection(self) -> None:
"""Agent with list containing A2AServerConfig should get to_agent_card."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=[
A2AClientConfig(endpoint="http://example.com"),
A2AServerConfig(name="My Agent"),
],
)
assert hasattr(agent, "to_agent_card")
assert callable(agent.to_agent_card)
def test_manual_injection_on_plain_agent(self) -> None:
"""inject_a2a_server_methods should work when called manually."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
)
# Manually set server config and inject
object.__setattr__(agent, "a2a", A2AServerConfig())
inject_a2a_server_methods(agent)
assert hasattr(agent, "to_agent_card")
assert callable(agent.to_agent_card)
class TestToAgentCard:
"""Tests for the injected to_agent_card method."""
def test_returns_agent_card(self) -> None:
"""to_agent_card should return an AgentCard instance."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert isinstance(card, AgentCard)
def test_uses_agent_role_as_name(self) -> None:
"""AgentCard name should default to agent role."""
agent = Agent(
role="Data Analyst",
goal="Analyze data",
backstory="Expert analyst",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.name == "Data Analyst"
def test_uses_server_config_name(self) -> None:
"""AgentCard name should prefer A2AServerConfig.name over role."""
agent = Agent(
role="Data Analyst",
goal="Analyze data",
backstory="Expert analyst",
a2a=A2AServerConfig(name="Custom Agent Name"),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.name == "Custom Agent Name"
def test_uses_goal_as_description(self) -> None:
"""AgentCard description should include agent goal."""
agent = Agent(
role="Test Agent",
goal="Accomplish important tasks",
backstory="Has extensive experience",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert "Accomplish important tasks" in card.description
def test_uses_server_config_description(self) -> None:
"""AgentCard description should prefer A2AServerConfig.description."""
agent = Agent(
role="Test Agent",
goal="Accomplish important tasks",
backstory="Has extensive experience",
a2a=A2AServerConfig(description="Custom description"),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.description == "Custom description"
def test_uses_provided_url(self) -> None:
"""AgentCard url should use the provided URL."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://my-server.com:9000")
assert card.url == "http://my-server.com:9000"
def test_uses_server_config_url(self) -> None:
"""AgentCard url should prefer A2AServerConfig.url over provided URL."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(url="http://configured-url.com"),
)
card = agent.to_agent_card("http://fallback-url.com")
assert card.url == "http://configured-url.com/"
def test_generates_default_skill(self) -> None:
"""AgentCard should have at least one skill based on agent role."""
agent = Agent(
role="Research Assistant",
goal="Help with research",
backstory="Skilled researcher",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert len(card.skills) >= 1
skill = card.skills[0]
assert skill.name == "Research Assistant"
assert skill.description == "Help with research"
def test_uses_server_config_skills(self) -> None:
"""AgentCard skills should prefer A2AServerConfig.skills."""
custom_skill = AgentSkill(
id="custom-skill",
name="Custom Skill",
description="A custom skill",
tags=["custom"],
)
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(skills=[custom_skill]),
)
card = agent.to_agent_card("http://localhost:8000")
assert len(card.skills) == 1
assert card.skills[0].id == "custom-skill"
assert card.skills[0].name == "Custom Skill"
def test_includes_custom_version(self) -> None:
"""AgentCard should include version from A2AServerConfig."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(version="2.0.0"),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.version == "2.0.0"
def test_default_version(self) -> None:
"""AgentCard should have default version 1.0.0."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.version == "1.0.0"
class TestAgentCardJsonStructure:
"""Tests for the JSON structure of AgentCard."""
def test_json_has_required_fields(self) -> None:
"""AgentCard JSON should contain all required A2A protocol fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump()
assert "name" in json_data
assert "description" in json_data
assert "url" in json_data
assert "version" in json_data
assert "skills" in json_data
assert "capabilities" in json_data
assert "defaultInputModes" in json_data
assert "defaultOutputModes" in json_data
def test_json_skills_structure(self) -> None:
"""Each skill in JSON should have required fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump()
assert len(json_data["skills"]) >= 1
skill = json_data["skills"][0]
assert "id" in skill
assert "name" in skill
assert "description" in skill
assert "tags" in skill
def test_json_capabilities_structure(self) -> None:
"""Capabilities in JSON should have expected fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump()
capabilities = json_data["capabilities"]
assert "streaming" in capabilities
assert "pushNotifications" in capabilities
def test_json_serializable(self) -> None:
"""AgentCard should be JSON serializable."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_str = card.model_dump_json()
assert isinstance(json_str, str)
assert "Test Agent" in json_str
assert "http://localhost:8000" in json_str
def test_json_excludes_none_values(self) -> None:
"""AgentCard JSON with exclude_none should omit None fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump(exclude_none=True)
assert "provider" not in json_data
assert "documentationUrl" not in json_data
assert "iconUrl" not in json_data

View File

@@ -1,370 +0,0 @@
"""Tests for A2A task utilities."""
from __future__ import annotations
import asyncio
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import pytest_asyncio
from a2a.server.agent_execution import RequestContext
from a2a.server.events import EventQueue
from a2a.types import Message, Task as A2ATask, TaskState, TaskStatus
from crewai.a2a.utils.task import cancel, cancellable, execute
@pytest.fixture
def mock_agent() -> MagicMock:
"""Create a mock CrewAI agent."""
agent = MagicMock()
agent.role = "Test Agent"
agent.tools = []
agent.aexecute_task = AsyncMock(return_value="Task completed successfully")
return agent
@pytest.fixture
def mock_task() -> MagicMock:
"""Create a mock Task."""
return MagicMock()
@pytest.fixture
def mock_context() -> MagicMock:
"""Create a mock RequestContext."""
context = MagicMock(spec=RequestContext)
context.task_id = "test-task-123"
context.context_id = "test-context-456"
context.get_user_input.return_value = "Test user message"
context.message = MagicMock(spec=Message)
context.current_task = None
return context
@pytest.fixture
def mock_event_queue() -> AsyncMock:
"""Create a mock EventQueue."""
queue = AsyncMock(spec=EventQueue)
queue.enqueue_event = AsyncMock()
return queue
@pytest_asyncio.fixture(autouse=True)
async def clear_cache(mock_context: MagicMock) -> None:
"""Clear cancel flag from cache before each test."""
from aiocache import caches
cache = caches.get("default")
await cache.delete(f"cancel:{mock_context.task_id}")
class TestCancellableDecorator:
"""Tests for the cancellable decorator."""
@pytest.mark.asyncio
async def test_executes_function_without_context(self) -> None:
"""Function executes normally when no RequestContext is provided."""
call_count = 0
@cancellable
async def my_func(value: int) -> int:
nonlocal call_count
call_count += 1
return value * 2
result = await my_func(5)
assert result == 10
assert call_count == 1
@pytest.mark.asyncio
async def test_executes_function_with_context(self, mock_context: MagicMock) -> None:
"""Function executes normally with RequestContext when not cancelled."""
@cancellable
async def my_func(context: RequestContext) -> str:
await asyncio.sleep(0.01)
return "completed"
result = await my_func(mock_context)
assert result == "completed"
@pytest.mark.asyncio
async def test_cancellation_raises_cancelled_error(
self, mock_context: MagicMock
) -> None:
"""Function raises CancelledError when cancel flag is set."""
from aiocache import caches
cache = caches.get("default")
@cancellable
async def slow_func(context: RequestContext) -> str:
await asyncio.sleep(1.0)
return "should not reach"
await cache.set(f"cancel:{mock_context.task_id}", True)
with pytest.raises(asyncio.CancelledError):
await slow_func(mock_context)
@pytest.mark.asyncio
async def test_cleanup_removes_cancel_flag(self, mock_context: MagicMock) -> None:
"""Cancel flag is cleaned up after execution."""
from aiocache import caches
cache = caches.get("default")
@cancellable
async def quick_func(context: RequestContext) -> str:
return "done"
await quick_func(mock_context)
flag = await cache.get(f"cancel:{mock_context.task_id}")
assert flag is None
@pytest.mark.asyncio
async def test_extracts_context_from_kwargs(self, mock_context: MagicMock) -> None:
"""Context can be passed as keyword argument."""
@cancellable
async def my_func(value: int, context: RequestContext | None = None) -> int:
return value + 1
result = await my_func(10, context=mock_context)
assert result == 11
class TestExecute:
"""Tests for the execute function."""
@pytest.mark.asyncio
async def test_successful_execution(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Execute completes successfully and enqueues completed task."""
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
):
await execute(mock_agent, mock_context, mock_event_queue)
mock_agent.aexecute_task.assert_called_once()
mock_event_queue.enqueue_event.assert_called_once()
assert mock_bus.emit.call_count == 2
@pytest.mark.asyncio
async def test_emits_started_event(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Execute emits A2AServerTaskStartedEvent."""
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
):
await execute(mock_agent, mock_context, mock_event_queue)
first_call = mock_bus.emit.call_args_list[0]
event = first_call[0][1]
assert event.type == "a2a_server_task_started"
assert event.a2a_task_id == mock_context.task_id
assert event.a2a_context_id == mock_context.context_id
@pytest.mark.asyncio
async def test_emits_completed_event(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Execute emits A2AServerTaskCompletedEvent on success."""
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
):
await execute(mock_agent, mock_context, mock_event_queue)
second_call = mock_bus.emit.call_args_list[1]
event = second_call[0][1]
assert event.type == "a2a_server_task_completed"
assert event.a2a_task_id == mock_context.task_id
assert event.result == "Task completed successfully"
@pytest.mark.asyncio
async def test_emits_failed_event_on_exception(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Execute emits A2AServerTaskFailedEvent on exception."""
mock_agent.aexecute_task = AsyncMock(side_effect=ValueError("Test error"))
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
):
with pytest.raises(Exception):
await execute(mock_agent, mock_context, mock_event_queue)
failed_call = mock_bus.emit.call_args_list[1]
event = failed_call[0][1]
assert event.type == "a2a_server_task_failed"
assert "Test error" in event.error
@pytest.mark.asyncio
async def test_emits_canceled_event_on_cancellation(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Execute emits A2AServerTaskCanceledEvent on CancelledError."""
mock_agent.aexecute_task = AsyncMock(side_effect=asyncio.CancelledError())
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
):
with pytest.raises(asyncio.CancelledError):
await execute(mock_agent, mock_context, mock_event_queue)
canceled_call = mock_bus.emit.call_args_list[1]
event = canceled_call[0][1]
assert event.type == "a2a_server_task_canceled"
assert event.a2a_task_id == mock_context.task_id
class TestCancel:
"""Tests for the cancel function."""
@pytest.mark.asyncio
async def test_sets_cancel_flag_in_cache(
self,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
) -> None:
"""Cancel sets the cancel flag in cache."""
from aiocache import caches
cache = caches.get("default")
await cancel(mock_context, mock_event_queue)
flag = await cache.get(f"cancel:{mock_context.task_id}")
assert flag is True
@pytest.mark.asyncio
async def test_enqueues_task_status_update_event(
self,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
) -> None:
"""Cancel enqueues TaskStatusUpdateEvent with canceled state."""
await cancel(mock_context, mock_event_queue)
mock_event_queue.enqueue_event.assert_called_once()
event = mock_event_queue.enqueue_event.call_args[0][0]
assert event.task_id == mock_context.task_id
assert event.context_id == mock_context.context_id
assert event.status.state == TaskState.canceled
assert event.final is True
@pytest.mark.asyncio
async def test_returns_none_when_no_current_task(
self,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
) -> None:
"""Cancel returns None when context has no current_task."""
mock_context.current_task = None
result = await cancel(mock_context, mock_event_queue)
assert result is None
@pytest.mark.asyncio
async def test_returns_updated_task_when_current_task_exists(
self,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
) -> None:
"""Cancel returns updated task when context has current_task."""
current_task = MagicMock(spec=A2ATask)
current_task.status = TaskStatus(state=TaskState.working)
mock_context.current_task = current_task
result = await cancel(mock_context, mock_event_queue)
assert result is current_task
assert result.status.state == TaskState.canceled
@pytest.mark.asyncio
async def test_cleanup_after_cancel(
self,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
) -> None:
"""Cancel flag persists for cancellable decorator to detect."""
from aiocache import caches
cache = caches.get("default")
await cancel(mock_context, mock_event_queue)
flag = await cache.get(f"cancel:{mock_context.task_id}")
assert flag is True
await cache.delete(f"cancel:{mock_context.task_id}")
class TestExecuteAndCancelIntegration:
"""Integration tests for execute and cancel working together."""
@pytest.mark.asyncio
async def test_cancel_stops_running_execute(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Calling cancel stops a running execute."""
async def slow_task(**kwargs: Any) -> str:
await asyncio.sleep(2.0)
return "should not complete"
mock_agent.aexecute_task = slow_task
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus"),
):
execute_task = asyncio.create_task(
execute(mock_agent, mock_context, mock_event_queue)
)
await asyncio.sleep(0.1)
await cancel(mock_context, mock_event_queue)
with pytest.raises(asyncio.CancelledError):
await execute_task

View File

@@ -0,0 +1,408 @@
"""Tests for experimental environment tools."""
from __future__ import annotations
import os
import tempfile
from collections.abc import Generator
from pathlib import Path
import pytest
from crewai.experimental.environment_tools import (
BaseEnvironmentTool,
EnvironmentTools,
FileReadTool,
FileSearchTool,
GrepTool,
ListDirTool,
)
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
def temp_dir() -> Generator[str, None, None]:
"""Create a temporary directory with test files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files
test_file = Path(tmpdir) / "test.txt"
test_file.write_text("Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n")
python_file = Path(tmpdir) / "example.py"
python_file.write_text("def hello():\n print('Hello World')\n")
# Create subdirectory with files
subdir = Path(tmpdir) / "subdir"
subdir.mkdir()
(subdir / "nested.txt").write_text("Nested content\n")
(subdir / "another.py").write_text("# Another Python file\n")
yield tmpdir
@pytest.fixture
def restricted_temp_dir() -> Generator[tuple[str, str], None, None]:
"""Create two directories - one allowed, one not."""
with tempfile.TemporaryDirectory() as allowed_dir:
with tempfile.TemporaryDirectory() as forbidden_dir:
# Create files in both
(Path(allowed_dir) / "allowed.txt").write_text("Allowed content\n")
(Path(forbidden_dir) / "forbidden.txt").write_text("Forbidden content\n")
yield allowed_dir, forbidden_dir
# ============================================================================
# BaseEnvironmentTool Tests
# ============================================================================
class TestBaseEnvironmentTool:
"""Tests for BaseEnvironmentTool path validation."""
def test_default_allowed_paths_is_current_directory(self) -> None:
"""Default allowed_paths should be current directory for security."""
tool = FileReadTool()
assert tool.allowed_paths == ["."]
def test_validate_path_explicit_no_restrictions(self, temp_dir: str) -> None:
"""With explicit empty allowed_paths, all paths should be allowed."""
tool = FileReadTool(allowed_paths=[])
valid, result = tool._validate_path(temp_dir)
assert valid is True
assert isinstance(result, Path)
def test_validate_path_within_allowed(self, temp_dir: str) -> None:
"""Paths within allowed_paths should be valid."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
valid, result = tool._validate_path(test_file)
assert valid is True
assert isinstance(result, Path)
def test_validate_path_outside_allowed(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Paths outside allowed_paths should be rejected."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileReadTool(allowed_paths=[allowed_dir])
forbidden_file = os.path.join(forbidden_dir, "forbidden.txt")
valid, result = tool._validate_path(forbidden_file)
assert valid is False
assert isinstance(result, str)
assert "outside allowed paths" in result
def test_format_size(self) -> None:
"""Test human-readable size formatting."""
tool = FileReadTool()
assert tool._format_size(500) == "500B"
assert tool._format_size(1024) == "1.0KB"
assert tool._format_size(1536) == "1.5KB"
assert tool._format_size(1024 * 1024) == "1.0MB"
assert tool._format_size(1024 * 1024 * 1024) == "1.0GB"
# ============================================================================
# FileReadTool Tests
# ============================================================================
class TestFileReadTool:
"""Tests for FileReadTool."""
def test_read_entire_file(self, temp_dir: str) -> None:
"""Should read entire file contents."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(path=test_file)
assert "Line 1" in result
assert "Line 2" in result
assert "Line 5" in result
assert "File:" in result # Metadata header
def test_read_with_line_range(self, temp_dir: str) -> None:
"""Should read specific line range."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(path=test_file, start_line=2, line_count=2)
assert "Line 2" in result
assert "Line 3" in result
# Should not include lines outside range
assert "Line 1" not in result.split("=" * 60)[-1] # Check content after header
def test_read_file_not_found(self, temp_dir: str) -> None:
"""Should return error for missing file."""
tool = FileReadTool(allowed_paths=[temp_dir])
missing_file = os.path.join(temp_dir, "nonexistent.txt")
result = tool._run(path=missing_file)
assert "Error: File not found" in result
def test_read_file_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileReadTool(allowed_paths=[allowed_dir])
forbidden_file = os.path.join(forbidden_dir, "forbidden.txt")
result = tool._run(path=forbidden_file)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# ListDirTool Tests
# ============================================================================
class TestListDirTool:
"""Tests for ListDirTool."""
def test_list_directory(self, temp_dir: str) -> None:
"""Should list directory contents."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir)
assert "test.txt" in result
assert "example.py" in result
assert "subdir" in result
assert "Total:" in result
def test_list_with_pattern(self, temp_dir: str) -> None:
"""Should filter by pattern."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir, pattern="*.py")
assert "example.py" in result
assert "test.txt" not in result
def test_list_recursive(self, temp_dir: str) -> None:
"""Should list recursively when enabled."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir, recursive=True)
assert "nested.txt" in result
assert "another.py" in result
def test_list_nonexistent_directory(self, temp_dir: str) -> None:
"""Should return error for missing directory."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=os.path.join(temp_dir, "nonexistent"))
assert "Error: Directory not found" in result
def test_list_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = ListDirTool(allowed_paths=[allowed_dir])
result = tool._run(path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# GrepTool Tests
# ============================================================================
class TestGrepTool:
"""Tests for GrepTool."""
def test_grep_finds_pattern(self, temp_dir: str) -> None:
"""Should find matching patterns."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="Line 2", path=test_file)
assert "Line 2" in result
assert "matches" in result.lower() or "found" in result.lower()
def test_grep_no_matches(self, temp_dir: str) -> None:
"""Should report when no matches found."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="nonexistent pattern xyz", path=test_file)
assert "No matches found" in result
def test_grep_recursive(self, temp_dir: str) -> None:
"""Should search recursively in directories."""
tool = GrepTool(allowed_paths=[temp_dir])
result = tool._run(pattern="Nested", path=temp_dir, recursive=True)
assert "Nested" in result
def test_grep_case_insensitive(self, temp_dir: str) -> None:
"""Should support case-insensitive search."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="LINE", path=test_file, ignore_case=True)
assert "Line" in result or "matches" in result.lower()
def test_grep_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = GrepTool(allowed_paths=[allowed_dir])
result = tool._run(pattern="test", path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# FileSearchTool Tests
# ============================================================================
class TestFileSearchTool:
"""Tests for FileSearchTool."""
def test_find_files_by_pattern(self, temp_dir: str) -> None:
"""Should find files matching pattern."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*.py", path=temp_dir)
assert "example.py" in result
assert "another.py" in result
def test_find_no_matches(self, temp_dir: str) -> None:
"""Should report when no files match."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*.xyz", path=temp_dir)
assert "No" in result and "found" in result
def test_find_files_only(self, temp_dir: str) -> None:
"""Should filter to files only."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*", path=temp_dir, file_type="file")
# Should include files
assert "test.txt" in result or "example.py" in result
# Directories should have trailing slash in output
# Check that subdir is not listed as a file
def test_find_dirs_only(self, temp_dir: str) -> None:
"""Should filter to directories only."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*", path=temp_dir, file_type="dir")
assert "subdir" in result
def test_find_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileSearchTool(allowed_paths=[allowed_dir])
result = tool._run(pattern="*", path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# EnvironmentTools Manager Tests
# ============================================================================
class TestEnvironmentTools:
"""Tests for EnvironmentTools manager class."""
def test_default_allowed_paths_is_current_directory(self) -> None:
"""Default should restrict to current directory for security."""
env_tools = EnvironmentTools()
tools = env_tools.tools()
# All tools should default to current directory
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == ["."]
def test_explicit_empty_allowed_paths_allows_all(self) -> None:
"""Passing empty list should allow all paths."""
env_tools = EnvironmentTools(allowed_paths=[])
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == []
def test_returns_all_tools_by_default(self) -> None:
"""Should return all four tools by default."""
env_tools = EnvironmentTools()
tools = env_tools.tools()
assert len(tools) == 4
tool_names = [t.name for t in tools]
assert "read_file" in tool_names
assert "list_directory" in tool_names
assert "grep_search" in tool_names
assert "find_files" in tool_names
def test_exclude_grep(self) -> None:
"""Should exclude grep tool when disabled."""
env_tools = EnvironmentTools(include_grep=False)
tools = env_tools.tools()
assert len(tools) == 3
tool_names = [t.name for t in tools]
assert "grep_search" not in tool_names
def test_exclude_search(self) -> None:
"""Should exclude search tool when disabled."""
env_tools = EnvironmentTools(include_search=False)
tools = env_tools.tools()
assert len(tools) == 3
tool_names = [t.name for t in tools]
assert "find_files" not in tool_names
def test_allowed_paths_propagated(self, temp_dir: str) -> None:
"""Should propagate allowed_paths to all tools."""
env_tools = EnvironmentTools(allowed_paths=[temp_dir])
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == [temp_dir]
def test_tools_are_base_tool_instances(self) -> None:
"""All returned tools should be BaseTool instances."""
from crewai.tools.base_tool import BaseTool
env_tools = EnvironmentTools()
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseTool)

View File

@@ -515,94 +515,6 @@ def test_azure_supports_stop_words():
assert llm.supports_stop_words() == True
def test_azure_gpt5_models_do_not_support_stop_words():
"""
Test that GPT-5 family models do not support stop words.
GPT-5 models use the Responses API which doesn't support stop sequences.
See: https://learn.microsoft.com/en-us/azure/ai-foundry/foundry-models/concepts/models-sold-directly-by-azure
"""
# GPT-5 base models
gpt5_models = [
"azure/gpt-5",
"azure/gpt-5-mini",
"azure/gpt-5-nano",
"azure/gpt-5-chat",
# GPT-5.1 series
"azure/gpt-5.1",
"azure/gpt-5.1-chat",
"azure/gpt-5.1-codex",
"azure/gpt-5.1-codex-mini",
# GPT-5.2 series
"azure/gpt-5.2",
"azure/gpt-5.2-chat",
]
for model_name in gpt5_models:
llm = LLM(model=model_name)
assert llm.supports_stop_words() == False, f"Expected {model_name} to NOT support stop words"
def test_azure_o_series_models_do_not_support_stop_words():
"""
Test that o-series reasoning models do not support stop words.
"""
o_series_models = [
"azure/o1",
"azure/o1-mini",
"azure/o3",
"azure/o3-mini",
"azure/o4",
"azure/o4-mini",
]
for model_name in o_series_models:
llm = LLM(model=model_name)
assert llm.supports_stop_words() == False, f"Expected {model_name} to NOT support stop words"
def test_azure_responses_api_models_do_not_support_stop_words():
"""
Test that models using the Responses API do not support stop words.
"""
responses_api_models = [
"azure/computer-use-preview",
]
for model_name in responses_api_models:
llm = LLM(model=model_name)
assert llm.supports_stop_words() == False, f"Expected {model_name} to NOT support stop words"
def test_azure_stop_words_not_included_for_unsupported_models():
"""
Test that stop words are not included in completion params for models that don't support them.
"""
with patch.dict(os.environ, {
"AZURE_API_KEY": "test-key",
"AZURE_ENDPOINT": "https://models.inference.ai.azure.com"
}):
# Test GPT-5 model - stop should NOT be included even if set
llm_gpt5 = LLM(
model="azure/gpt-5-nano",
stop=["STOP", "END"]
)
params = llm_gpt5._prepare_completion_params(
messages=[{"role": "user", "content": "test"}]
)
assert "stop" not in params, "stop should not be included for GPT-5 models"
# Test regular model - stop SHOULD be included
llm_gpt4 = LLM(
model="azure/gpt-4",
stop=["STOP", "END"]
)
params = llm_gpt4._prepare_completion_params(
messages=[{"role": "user", "content": "test"}]
)
assert "stop" in params, "stop should be included for GPT-4 models"
assert params["stop"] == ["STOP", "END"]
def test_azure_context_window_size():
"""
Test that Azure models return correct context window sizes

View File

@@ -4500,71 +4500,6 @@ def test_crew_copy_with_memory():
pytest.fail(f"Copying crew raised an unexpected exception: {e}")
def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow():
@CrewBase
class TestCrew:
agents_config = None
tasks_config = None
agents: list[BaseAgent]
tasks: list[Task]
@agent
def researcher(self) -> Agent:
return Agent(
role="Researcher",
goal="Research things",
backstory="Expert researcher",
)
@agent
def writer_agent(self) -> Agent:
return Agent(
role="Writer",
goal="Write things",
backstory="Expert writer",
)
@task
def research_task(self) -> Task:
return Task(
description="Test task for researcher",
expected_output="output",
agent=self.researcher(),
)
@task
def write_task(self) -> Task:
return Task(
description="Test task for writer",
expected_output="output",
agent=self.writer_agent(),
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
)
captured_crew = None
class MyFlow(Flow):
@start()
def start_method(self):
nonlocal captured_crew
captured_crew = TestCrew().crew()
return captured_crew
flow = MyFlow()
flow.kickoff()
assert captured_crew is not None
assert captured_crew.parent_flow is flow
def test_sets_parent_flow_when_outside_flow(researcher, writer):
crew = Crew(
agents=[researcher, writer],

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.8.1"
__version__ = "1.8.0"

View File

@@ -117,7 +117,7 @@ show_error_codes = true
warn_unused_ignores = true
python_version = "3.12"
exclude = "(?x)(^lib/crewai/src/crewai/cli/templates/ | ^lib/crewai/tests/ | ^lib/crewai-tools/tests/)"
plugins = ["pydantic.mypy"]
plugins = ["pydantic.mypy", "crewai.mypy"]
[tool.bandit]