Compare commits

..

3 Commits

Author SHA1 Message Date
Iris Clawd
71c7293197 docs: remove --public flag references (public tools no longer supported)
Remove all references to --public visibility flag since public tools
have been removed from the platform (crewai-plus#2380, ENG-1453).
Tools are now organization-scoped (private) by default.

Co-authored-by: Diego Nogues <diego@crewai.com>
2026-05-14 09:52:10 -03:00
Iris Clawd
288568110f docs: add Platform Tools CLI guide for crewai tool create/publish/install
Add documentation for the CLI commands referenced in the Create Tool
modal on the platform (crewai tool create, crewai tool publish,
crewai tool install). These commands manage tools on the CrewAI
platform registry — distinct from PyPI publishing.

Changes:
- New guide: docs/en/guides/tools/platform-tools-cli.mdx
  Full lifecycle: create → implement → publish → install
  Covers visibility flags (--public/--private/--force)
  Includes platform vs PyPI comparison
- Updated create-custom-tools.mdx tip to cross-reference both guides
- Added new page to docs.json navigation (all versions)

Resolves EPD-76

Co-authored-by: Diego Nogues <diego@crewai.com>
2026-05-14 09:52:10 -03:00
iris-clawd
c36827b45b fix(docs/pt-BR): replace untranslated code block placeholders (#5781)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* fix(docs/pt-BR): replace untranslated code block placeholders

Replace all `# (O código não é traduzido)` and `# código não traduzido`
placeholder comments in the PT-BR docs with the actual code from the
English source files.

Files fixed:
- docs/pt-BR/concepts/flows.mdx (~15 placeholders → real code)
- docs/pt-BR/guides/flows/mastering-flow-state.mdx (~17 placeholders → real code)

Code itself is kept in English per i18n conventions. Inline # comments
within code blocks have been translated to Portuguese.

* fix(docs/pt-BR): address CodeRabbit review comments

- flows.mdx: add missing load_dotenv() call after imports
- mastering-flow-state.mdx: fix PersistentCounterFlow second-run example
  to pass inputs={"id": flow1.state.id} to kickoff(), matching the
  documented resume pattern; update comment accordingly
2026-05-13 12:23:18 -03:00
17 changed files with 4016 additions and 2015 deletions

View File

@@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"

File diff suppressed because it is too large Load Diff

View File

@@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"

View File

@@ -313,9 +313,9 @@ flow1 = PersistentCounterFlow()
result1 = flow1.kickoff()
print(f"First run result: {result1}")
# Second run - state is automatically loaded
# Second run - pass the ID to load the persisted state
flow2 = PersistentCounterFlow()
result2 = flow2.kickoff()
result2 = flow2.kickoff(inputs={"id": flow1.state.id})
print(f"Second run result: {result2}") # Will be higher due to persisted state
```

View File

@@ -0,0 +1,131 @@
---
title: Platform Tools CLI
description: Create, publish, and install custom tools on the CrewAI platform using the CLI.
icon: terminal
mode: "wide"
---
## Overview
The CrewAI CLI provides commands to manage custom tools on the **CrewAI platform** — a hosted tool registry that lets you share tools within your organization without publishing to PyPI.
| Command | Purpose |
|---------|---------|
| `crewai tool create <handle>` | Scaffold a new tool project |
| `crewai tool publish` | Publish the tool to the CrewAI platform |
| `crewai tool install <handle>` | Install a platform tool into your crew project |
<Note type="info" title="Platform vs PyPI">
These commands manage tools on the **CrewAI platform registry**. If you want to publish a standalone Python package to PyPI instead, see the [Publish Custom Tools to PyPI](/en/guides/tools/publish-custom-tools) guide.
</Note>
## Prerequisites
- **CrewAI CLI** installed (`pip install crewai`)
- **Authenticated** with the platform — run `crewai login` first
---
## Step 1: Create a Tool Project
Scaffold a new tool project:
```bash
crewai tool create my_custom_tool
```
This generates a project structure with the boilerplate you need to start building your tool.
<Tip>
The `handle` is the unique identifier for your tool on the platform. Choose something descriptive and specific to what the tool does.
</Tip>
### Implement Your Tool
Edit the generated tool file to add your logic. The tool follows the standard CrewAI tools contract — you can subclass `BaseTool` or use the `@tool` decorator:
```python
from crewai.tools import BaseTool
class MyCustomTool(BaseTool):
name: str = "My Custom Tool"
description: str = "Description of what this tool does — be specific so agents know when to use it."
def _run(self, argument: str) -> str:
# Your tool logic here
return "result"
```
For the full tools API reference (input schemas, caching, async support, error handling), see the [Create Custom Tools](/en/learn/create-custom-tools) guide.
---
## Step 2: Publish to the Platform
From your tool project directory, publish it to the CrewAI platform:
```bash
crewai tool publish
```
### Options
| Flag | Description |
|------|-------------|
| `--force` | Bypass Git remote validations |
Tools are published privately to your organization by default.
---
## Step 3: Install a Platform Tool
To install a tool that's been published to the platform:
```bash
crewai tool install my_custom_tool
```
Once installed, you can use the tool in your crew like any other tool — assign it to an agent via the `tools` parameter.
---
## Full Lifecycle Example
```bash
# 1. Authenticate with the platform
crewai login
# 2. Scaffold a new tool
crewai tool create weather_lookup
# 3. Implement your logic in the generated project
cd weather_lookup
# ... edit the tool file ...
# 4. Publish to the platform
crewai tool publish
# 5. In another project, install and use it
crewai tool install weather_lookup
```
---
## Platform Tools vs PyPI Packages
| | Platform Tools | PyPI Packages |
|---|---|---|
| **Publish** | `crewai tool publish` | `uv build` + `uv publish` |
| **Registry** | CrewAI platform | PyPI |
| **Install** | `crewai tool install <handle>` | `pip install <package>` |
| **Auth** | `crewai login` | PyPI account + token |
| **Visibility** | Organization-scoped (private) | Always public |
| **Guide** | This page | [Publish Custom Tools](/en/guides/tools/publish-custom-tools) |
---
## Related
- [Create Custom Tools](/en/learn/create-custom-tools) — Python API reference for building tools (BaseTool, @tool decorator)
- [Publish Custom Tools to PyPI](/en/guides/tools/publish-custom-tools) — package and distribute tools as standalone Python libraries

View File

@@ -12,7 +12,9 @@ incorporating the latest functionalities such as tool delegation, error handling
enabling agents to perform a wide range of actions.
<Tip>
**Want to publish your tool for the community?** If you're building a tool that others could benefit from, check out the [Publish Custom Tools](/en/guides/tools/publish-custom-tools) guide to learn how to package and distribute your tool on PyPI.
**Want to publish your tool to the CrewAI platform?** Use the CLI to scaffold, publish, and share tools directly on the platform — see the [Platform Tools CLI](/en/guides/tools/platform-tools-cli) guide.
**Prefer publishing to PyPI?** Check out the [Publish Custom Tools](/en/guides/tools/publish-custom-tools) guide to package and distribute your tool as a standalone Python library.
</Tip>
### Subclassing `BaseTool`

View File

@@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"

View File

@@ -24,7 +24,63 @@ Os flows permitem que você crie fluxos de trabalho estruturados e orientados po
Vamos criar um Flow simples no qual você usará a OpenAI para gerar uma cidade aleatória em uma tarefa e, em seguida, usará essa cidade para gerar uma curiosidade em outra tarefa.
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"
@start()
def generate_city(self):
print("Starting flow")
# Cada estado do flow recebe automaticamente um ID único
print(f"Flow State ID: {self.state['id']}")
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": "Return the name of a random city in the world.",
},
],
)
random_city = response["choices"][0]["message"]["content"]
# Armazena a cidade no nosso estado
self.state["city"] = random_city
print(f"Random City: {random_city}")
return random_city
@listen(generate_city)
def generate_fun_fact(self, random_city):
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": f"Tell me a fun fact about {random_city}",
},
],
)
fun_fact = response["choices"][0]["message"]["content"]
# Armazena a curiosidade no nosso estado
self.state["fun_fact"] = fun_fact
return fun_fact
flow = ExampleFlow()
flow.plot()
result = flow.kickoff()
print(f"Generated fun fact: {result}")
```
Na ilustração acima, criamos um Flow simples que gera uma cidade aleatória usando a OpenAI e depois cria uma curiosidade sobre essa cidade. O Flow consiste em duas tarefas: `generate_city` e `generate_fun_fact`. A tarefa `generate_city` é o ponto de início do Flow, enquanto a tarefa `generate_fun_fact` fica escutando o resultado da tarefa `generate_city`.
@@ -56,12 +112,16 @@ O decorador `@listen()` pode ser usado de várias formas:
1. **Escutando um Método pelo Nome**: Você pode passar o nome do método ao qual deseja escutar como string. Quando esse método concluir, o método ouvinte será chamado.
```python Code
# (O código não é traduzido)
@listen("generate_city")
def generate_fun_fact(self, random_city):
# Implementação
```
2. **Escutando um Método Diretamente**: Você pode passar o próprio método. Quando esse método concluir, o método ouvinte será chamado.
```python Code
# (O código não é traduzido)
@listen(generate_city)
def generate_fun_fact(self, random_city):
# Implementação
```
### Saída de um Flow
@@ -76,7 +136,24 @@ Veja como acessar a saída final:
<CodeGroup>
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, start
class OutputExampleFlow(Flow):
@start()
def first_method(self):
return "Output from first_method"
@listen(first_method)
def second_method(self, first_output):
return f"Second method received: {first_output}"
flow = OutputExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
```
```text Output
@@ -97,8 +174,34 @@ Além de recuperar a saída final, você pode acessar e atualizar o estado dentr
Veja um exemplo de como atualizar e acessar o estado:
<CodeGroup>
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
counter: int = 0
message: str = ""
class StateExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
self.state.message = "Hello from first_method"
self.state.counter += 1
@listen(first_method)
def second_method(self):
self.state.message += " - updated by second_method"
self.state.counter += 1
return self.state.message
flow = StateExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
```
```text Output
@@ -128,7 +231,33 @@ Essa abordagem oferece flexibilidade, permitindo que o desenvolvedor adicione ou
Mesmo com estados não estruturados, os flows do CrewAI geram e mantêm automaticamente um identificador único (UUID) para cada instância de estado.
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, start
class UnstructuredExampleFlow(Flow):
@start()
def first_method(self):
# O estado inclui automaticamente um campo 'id'
print(f"State ID: {self.state['id']}")
self.state['counter'] = 0
self.state['message'] = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated"
@listen(second_method)
def third_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated again"
print(f"State after third_method: {self.state}")
flow = UnstructuredExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
![Flow Visual image](/images/crewai-flow-3.png)
@@ -148,7 +277,39 @@ Ao usar modelos como o `BaseModel` da Pydantic, os desenvolvedores podem definir
Cada estado nos flows do CrewAI recebe automaticamente um identificador único (UUID) para ajudar no rastreamento e gerenciamento. Esse ID é gerado e mantido automaticamente pelo sistema de flows.
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
# Nota: o campo 'id' é adicionado automaticamente a todos os estados
counter: int = 0
message: str = ""
class StructuredExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
# Acesse o ID gerado automaticamente, se necessário
print(f"State ID: {self.state.id}")
self.state.message = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state.counter += 1
self.state.message += " - updated"
@listen(second_method)
def third_method(self):
self.state.counter += 1
self.state.message += " - updated again"
print(f"State after third_method: {self.state}")
flow = StructuredExampleFlow()
flow.kickoff()
```
![Flow Visual image](/images/crewai-flow-3.png)
@@ -182,7 +343,19 @@ O decorador @persist permite a persistência automática do estado nos flows do
Quando aplicado no nível da classe, o decorador @persist garante a persistência automática de todos os estados dos métodos do flow:
```python
# (O código não é traduzido)
@persist # Usa SQLiteFlowPersistence por padrão
class MyFlow(Flow[MyState]):
@start()
def initialize_flow(self):
# Este método terá seu estado persistido automaticamente
self.state.counter = 1
print("Initialized flow. State ID:", self.state.id)
@listen(initialize_flow)
def next_step(self):
# O estado (incluindo self.state.id) é recarregado automaticamente
self.state.counter += 1
print("Flow state is persisted. Counter:", self.state.counter)
```
### Persistência no Nível de Método
@@ -190,7 +363,14 @@ Quando aplicado no nível da classe, o decorador @persist garante a persistênci
Para um controle mais granular, você pode aplicar @persist em métodos específicos:
```python
# (O código não é traduzido)
class AnotherFlow(Flow[dict]):
@persist # Persiste apenas o estado deste método
@start()
def begin(self):
if "runs" not in self.state:
self.state["runs"] = 0
self.state["runs"] += 1
print("Method-level persisted runs:", self.state["runs"])
```
### Forking de Estado Persistido
@@ -282,8 +462,29 @@ A arquitetura de persistência enfatiza precisão técnica e opções de persona
A função `or_` nos flows permite escutar múltiplos métodos e acionar o método ouvinte quando qualquer um dos métodos especificados gerar uma saída.
<CodeGroup>
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, or_, start
class OrExampleFlow(Flow):
@start()
def start_method(self):
return "Hello from the start method"
@listen(start_method)
def second_method(self):
return "Hello from the second method"
@listen(or_(start_method, second_method))
def logger(self, result):
print(f"Logger: {result}")
flow = OrExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
```text Output
@@ -302,8 +503,28 @@ A função `or_` serve para escutar vários métodos e disparar o método ouvint
A função `and_` nos flows permite escutar múltiplos métodos e acionar o método ouvinte apenas quando todos os métodos especificados emitirem uma saída.
<CodeGroup>
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, and_, listen, start
class AndExampleFlow(Flow):
@start()
def start_method(self):
self.state["greeting"] = "Hello from the start method"
@listen(start_method)
def second_method(self):
self.state["joke"] = "What do computers eat? Microchips."
@listen(and_(start_method, second_method))
def logger(self):
print("---- Logger ----")
print(self.state)
flow = AndExampleFlow()
flow.plot()
flow.kickoff()
```
```text Output
@@ -323,8 +544,42 @@ O decorador `@router()` nos flows permite definir lógica de roteamento condicio
Você pode especificar diferentes rotas conforme a saída do método, permitindo controlar o fluxo de execução de forma dinâmica.
<CodeGroup>
```python Code
# (O código não é traduzido)
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class ExampleState(BaseModel):
success_flag: bool = False
class RouterFlow(Flow[ExampleState]):
@start()
def start_method(self):
print("Starting the structured flow")
random_boolean = random.choice([True, False])
self.state.success_flag = random_boolean
@router(start_method)
def second_method(self):
if self.state.success_flag:
return "success"
else:
return "failed"
@listen("success")
def third_method(self):
print("Third method running")
@listen("failed")
def fourth_method(self):
print("Fourth method running")
flow = RouterFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
```text Output
@@ -401,7 +656,105 @@ Para um guia completo sobre feedback humano em flows, incluindo feedback assínc
Os agentes podem ser integrados facilmente aos seus flows, oferecendo uma alternativa leve às crews completas quando você precisar executar tarefas simples e focadas. Veja um exemplo de como utilizar um agente em um flow para realizar uma pesquisa de mercado:
```python
# (O código não é traduzido)
import asyncio
from typing import Any, Dict, List
from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start
# Define um formato de saída estruturado
class MarketAnalysis(BaseModel):
key_trends: List[str] = Field(description="List of identified market trends")
market_size: str = Field(description="Estimated market size")
competitors: List[str] = Field(description="Major competitors in the space")
# Define o estado do flow
class MarketResearchState(BaseModel):
product: str = ""
analysis: MarketAnalysis | None = None
# Cria uma classe de flow
class MarketResearchFlow(Flow[MarketResearchState]):
@start()
def initialize_research(self) -> Dict[str, Any]:
print(f"Starting market research for {self.state.product}")
return {"product": self.state.product}
@listen(initialize_research)
async def analyze_market(self) -> Dict[str, Any]:
# Cria um agente para pesquisa de mercado
analyst = Agent(
role="Market Research Analyst",
goal=f"Analyze the market for {self.state.product}",
backstory="You are an experienced market analyst with expertise in "
"identifying market trends and opportunities.",
tools=[SerperDevTool()],
verbose=True,
)
# Define a consulta de pesquisa
query = f"""
Research the market for {self.state.product}. Include:
1. Key market trends
2. Market size
3. Major competitors
Format your response according to the specified structure.
"""
# Executa a análise com formato de saída estruturado
result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
if result.pydantic:
print("result", result.pydantic)
else:
print("result", result)
# Retorna a análise para atualizar o estado
return {"analysis": result.pydantic}
@listen(analyze_market)
def present_results(self, analysis) -> None:
print("\nMarket Analysis Results")
print("=====================")
if isinstance(analysis, dict):
# Se recebemos um dict com a chave 'analysis', extrai o objeto de análise real
market_analysis = analysis.get("analysis")
else:
market_analysis = analysis
if market_analysis and isinstance(market_analysis, MarketAnalysis):
print("\nKey Market Trends:")
for trend in market_analysis.key_trends:
print(f"- {trend}")
print(f"\nMarket Size: {market_analysis.market_size}")
print("\nMajor Competitors:")
for competitor in market_analysis.competitors:
print(f"- {competitor}")
else:
print("No structured analysis data available.")
print("Raw analysis:", analysis)
# Exemplo de uso
async def run_flow():
flow = MarketResearchFlow()
flow.plot("MarketResearchFlowPlot")
result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
return result
# Executa o flow
if __name__ == "__main__":
asyncio.run(run_flow())
```
![Flow Visual image](/images/crewai-flow-7.png)
@@ -463,7 +816,50 @@ No arquivo `main.py`, você cria seu flow e conecta as crews. É possível defin
Veja um exemplo de como conectar a `poem_crew` no arquivo `main.py`:
```python Code
# (O código não é traduzido)
#!/usr/bin/env python
from random import randint
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew
class PoemState(BaseModel):
sentence_count: int = 1
poem: str = ""
class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
print("Generating sentence count")
self.state.sentence_count = randint(1, 5)
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})
print("Poem generated", result.raw)
self.state.poem = result.raw
@listen(generate_poem)
def save_poem(self):
print("Saving poem")
with open("poem.txt", "w") as f:
f.write(self.state.poem)
def kickoff():
poem_flow = PoemFlow()
poem_flow.kickoff()
def plot():
poem_flow = PoemFlow()
poem_flow.plot("PoemFlowPlot")
if __name__ == "__main__":
kickoff()
plot()
```
Neste exemplo, a classe `PoemFlow` define um fluxo que gera a quantidade de frases, usa a `PoemCrew` para gerar um poema e, depois, salva o poema em um arquivo. O flow inicia com o método `kickoff()`, e o gráfico é gerado pelo método `plot()`.
@@ -515,7 +911,8 @@ O CrewAI oferece duas formas práticas de gerar plots dos seus flows:
Se estiver trabalhando diretamente com uma instância do flow, basta chamar o método `plot()` do objeto. Isso criará um arquivo HTML com o plot interativo do seu flow.
```python Code
# (O código não é traduzido)
# Considerando que você já tem uma instância do flow
flow.plot("my_flow_plot")
```
Esse comando gera um arquivo chamado `my_flow_plot.html` no diretório atual. Abra esse arquivo em um navegador para visualizar o plot interativo.

View File

@@ -63,7 +63,60 @@ Com estado não estruturado:
Veja um exemplo simples de gerenciamento de estado não estruturado:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
class UnstructuredStateFlow(Flow):
@start()
def initialize_data(self):
print("Initializing flow data")
# Adiciona pares chave-valor ao estado
self.state["user_name"] = "Alex"
self.state["preferences"] = {
"theme": "dark",
"language": "English"
}
self.state["items"] = []
# O estado do flow recebe automaticamente um ID único
print(f"Flow ID: {self.state['id']}")
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
print(f"Previous step returned: {previous_result}")
# Acessa e modifica o estado
user = self.state["user_name"]
print(f"Processing data for {user}")
# Adiciona itens a uma lista no estado
self.state["items"].append("item1")
self.state["items"].append("item2")
# Adiciona um novo par chave-valor
self.state["processed"] = True
return "Processed"
@listen(process_data)
def generate_summary(self, previous_result):
# Acessa múltiplos valores do estado
user = self.state["user_name"]
theme = self.state["preferences"]["theme"]
items = self.state["items"]
processed = self.state.get("processed", False)
summary = f"User {user} has {len(items)} items with {theme} theme. "
summary += "Data is processed." if processed else "Data is not processed."
return summary
# Executa o flow
flow = UnstructuredStateFlow()
result = flow.kickoff()
print(f"Final result: {result}")
print(f"Final state: {flow.state}")
```
### Quando Usar Estado Não Estruturado
@@ -94,7 +147,63 @@ Ao utilizar estado estruturado:
Veja como implementar o gerenciamento de estado estruturado:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
# Define o modelo de estado
class UserPreferences(BaseModel):
theme: str = "light"
language: str = "English"
class AppState(BaseModel):
user_name: str = ""
preferences: UserPreferences = UserPreferences()
items: List[str] = []
processed: bool = False
completion_percentage: float = 0.0
# Cria um flow com estado tipado
class StructuredStateFlow(Flow[AppState]):
@start()
def initialize_data(self):
print("Initializing flow data")
# Define valores do estado (com checagem de tipo)
self.state.user_name = "Taylor"
self.state.preferences.theme = "dark"
# O campo ID está disponível automaticamente
print(f"Flow ID: {self.state.id}")
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
print(f"Processing data for {self.state.user_name}")
# Modifica o estado (com checagem de tipo)
self.state.items.append("item1")
self.state.items.append("item2")
self.state.processed = True
self.state.completion_percentage = 50.0
return "Processed"
@listen(process_data)
def generate_summary(self, previous_result):
# Acessa o estado (com autocompletar)
summary = f"User {self.state.user_name} has {len(self.state.items)} items "
summary += f"with {self.state.preferences.theme} theme. "
summary += "Data is processed." if self.state.processed else "Data is not processed."
summary += f" Completion: {self.state.completion_percentage}%"
return summary
# Executa o flow
flow = StructuredStateFlow()
result = flow.kickoff()
print(f"Final result: {result}")
print(f"Final state: {flow.state}")
```
### Benefícios do Estado Estruturado
@@ -138,7 +247,29 @@ Independente de você usar estado estruturado ou não estruturado, é possível
Métodos do flow podem retornar valores que serão passados como argumento para métodos listeners:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
class DataPassingFlow(Flow):
@start()
def generate_data(self):
# Este valor de retorno será passado para os métodos listeners
return "Generated data"
@listen(generate_data)
def process_data(self, data_from_previous_step):
print(f"Received: {data_from_previous_step}")
# Você pode modificar os dados e repassá-los adiante
processed_data = f"{data_from_previous_step} - processed"
# Também atualiza o estado
self.state["last_processed"] = processed_data
return processed_data
@listen(process_data)
def finalize_data(self, processed_data):
print(f"Received processed data: {processed_data}")
# Acessa tanto os dados passados quanto o estado
last_processed = self.state.get("last_processed", "")
return f"Final: {processed_data} (from state: {last_processed})"
```
Esse padrão permite combinar passagem de dados direta com atualizações de estado para obter máxima flexibilidade.
@@ -156,7 +287,36 @@ O decorador `@persist()` automatiza a persistência de estado, salvando o estado
Ao aplicar em nível de classe, `@persist()` salva o estado após cada execução de método:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
value: int = 0
@persist() # Aplica à classe inteira do flow
class PersistentCounterFlow(Flow[CounterState]):
@start()
def increment(self):
self.state.value += 1
print(f"Incremented to {self.state.value}")
return self.state.value
@listen(increment)
def double(self, value):
self.state.value = value * 2
print(f"Doubled to {self.state.value}")
return self.state.value
# Primeira execução
flow1 = PersistentCounterFlow()
result1 = flow1.kickoff()
print(f"First run result: {result1}")
# Segunda execução - passa o ID para carregar o estado persistido
flow2 = PersistentCounterFlow()
result2 = flow2.kickoff(inputs={"id": flow1.state.id})
print(f"Second run result: {result2}") # Será maior devido ao estado persistido
```
#### Persistência em Nível de Método
@@ -164,7 +324,26 @@ Ao aplicar em nível de classe, `@persist()` salva o estado após cada execuçã
Para mais controle, você pode aplicar `@persist()` em métodos específicos:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
class SelectivePersistFlow(Flow):
@start()
def first_step(self):
self.state["count"] = 1
return "First step"
@persist() # Persiste apenas após este método
@listen(first_step)
def important_step(self, prev_result):
self.state["count"] += 1
self.state["important_data"] = "This will be persisted"
return "Important step completed"
@listen(important_step)
def final_step(self, prev_result):
self.state["count"] += 1
return f"Complete with count {self.state['count']}"
```
#### Forking de Estado Persistido
@@ -216,7 +395,45 @@ Notas sobre o comportamento:
Você pode usar o estado para implementar lógicas condicionais complexas em seus flows:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class PaymentState(BaseModel):
amount: float = 0.0
is_approved: bool = False
retry_count: int = 0
class PaymentFlow(Flow[PaymentState]):
@start()
def process_payment(self):
# Simula o processamento do pagamento
self.state.amount = 100.0
self.state.is_approved = self.state.amount < 1000
return "Payment processed"
@router(process_payment)
def check_approval(self, previous_result):
if self.state.is_approved:
return "approved"
elif self.state.retry_count < 3:
return "retry"
else:
return "rejected"
@listen("approved")
def handle_approval(self):
return f"Payment of ${self.state.amount} approved!"
@listen("retry")
def handle_retry(self):
self.state.retry_count += 1
print(f"Retrying payment (attempt {self.state.retry_count})...")
# Aqui poderia ser implementada a lógica de retry
return "Retry initiated"
@listen("rejected")
def handle_rejection(self):
return f"Payment of ${self.state.amount} rejected after {self.state.retry_count} retries."
```
### Manipulações Complexas de Estado
@@ -224,7 +441,60 @@ Você pode usar o estado para implementar lógicas condicionais complexas em seu
Para transformar estados complexos, você pode criar métodos dedicados:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
from typing import List, Dict
class UserData(BaseModel):
name: str
active: bool = True
login_count: int = 0
class ComplexState(BaseModel):
users: Dict[str, UserData] = {}
active_user_count: int = 0
class TransformationFlow(Flow[ComplexState]):
@start()
def initialize(self):
# Adiciona alguns usuários
self.add_user("alice", "Alice")
self.add_user("bob", "Bob")
self.add_user("charlie", "Charlie")
return "Initialized"
@listen(initialize)
def process_users(self, _):
# Incrementa contagens de login
for user_id in self.state.users:
self.increment_login(user_id)
# Desativa um usuário
self.deactivate_user("bob")
# Atualiza a contagem de ativos
self.update_active_count()
return f"Processed {len(self.state.users)} users"
# Métodos auxiliares para transformações de estado
def add_user(self, user_id: str, name: str):
self.state.users[user_id] = UserData(name=name)
self.update_active_count()
def increment_login(self, user_id: str):
if user_id in self.state.users:
self.state.users[user_id].login_count += 1
def deactivate_user(self, user_id: str):
if user_id in self.state.users:
self.state.users[user_id].active = False
self.update_active_count()
def update_active_count(self):
self.state.active_user_count = sum(
1 for user in self.state.users.values() if user.active
)
```
Esse padrão de criar métodos auxiliares mantém seus métodos de flow limpos, enquanto permite manipulações complexas de estado.
@@ -238,7 +508,71 @@ Um dos padrões mais poderosos na CrewAI é combinar o gerenciamento de estado d
Você pode usar o estado do flow para parametrizar crews:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Process, Task
from pydantic import BaseModel
class ResearchState(BaseModel):
topic: str = ""
depth: str = "medium"
results: str = ""
class ResearchFlow(Flow[ResearchState]):
@start()
def get_parameters(self):
# Em uma aplicação real, isso pode vir da entrada do usuário
self.state.topic = "Artificial Intelligence Ethics"
self.state.depth = "deep"
return "Parameters set"
@listen(get_parameters)
def execute_research(self, _):
# Cria os agentes
researcher = Agent(
role="Research Specialist",
goal=f"Research {self.state.topic} in {self.state.depth} detail",
backstory="You are an expert researcher with a talent for finding accurate information."
)
writer = Agent(
role="Content Writer",
goal="Transform research into clear, engaging content",
backstory="You excel at communicating complex ideas clearly and concisely."
)
# Cria as tarefas
research_task = Task(
description=f"Research {self.state.topic} with {self.state.depth} analysis",
expected_output="Comprehensive research notes in markdown format",
agent=researcher
)
writing_task = Task(
description=f"Create a summary on {self.state.topic} based on the research",
expected_output="Well-written article in markdown format",
agent=writer,
context=[research_task]
)
# Cria e executa a crew
research_crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
verbose=True
)
# Executa a crew e armazena o resultado no estado
result = research_crew.kickoff()
self.state.results = result.raw
return "Research completed"
@listen(execute_research)
def summarize_results(self, _):
# Acessa os resultados armazenados
result_length = len(self.state.results)
return f"Research on {self.state.topic} completed with {result_length} characters of results."
```
### Manipulando Saídas de Crews no Estado
@@ -246,7 +580,21 @@ Você pode usar o estado do flow para parametrizar crews:
Quando um crew finaliza, é possível processar sua saída e armazená-la no estado do flow:
```python
# código não traduzido
@listen(execute_crew)
def process_crew_results(self, _):
# Faz parsing dos resultados brutos (assumindo saída em JSON)
import json
try:
results_dict = json.loads(self.state.raw_results)
self.state.processed_results = {
"title": results_dict.get("title", ""),
"main_points": results_dict.get("main_points", []),
"conclusion": results_dict.get("conclusion", "")
}
return "Results processed successfully"
except json.JSONDecodeError:
self.state.error = "Failed to parse crew results as JSON"
return "Error processing results"
```
## Boas Práticas para Gerenciamento de Estado
@@ -256,7 +604,19 @@ Quando um crew finaliza, é possível processar sua saída e armazená-la no est
Projete seu estado para conter somente o necessário:
```python
# Exemplo não traduzido
# Abrangente demais
class BloatedState(BaseModel):
user_data: Dict = {}
system_settings: Dict = {}
temporary_calculations: List = []
debug_info: Dict = {}
# ...muitos outros campos
# Melhor: estado focado
class FocusedState(BaseModel):
user_id: str
preferences: Dict[str, str]
completion_status: Dict[str, bool]
```
### 2. Use Estado Estruturado em Flows Complexos
@@ -264,7 +624,23 @@ Projete seu estado para conter somente o necessário:
À medida que seus flows evoluem em complexidade, o estado estruturado se torna cada vez mais valioso:
```python
# Exemplo não traduzido
# Flow simples pode usar estado não estruturado
class SimpleGreetingFlow(Flow):
@start()
def greet(self):
self.state["name"] = "World"
return f"Hello, {self.state['name']}!"
# Flow complexo se beneficia de estado estruturado
class UserRegistrationState(BaseModel):
username: str
email: str
verification_status: bool = False
registration_date: datetime = Field(default_factory=datetime.now)
last_login: Optional[datetime] = None
class RegistrationFlow(Flow[UserRegistrationState]):
# Métodos com acesso ao estado fortemente tipado
```
### 3. Documente Transições de Estado
@@ -272,7 +648,18 @@ Projete seu estado para conter somente o necessário:
Para flows complexos, documente como o estado muda ao longo da execução:
```python
# Exemplo não traduzido
@start()
def initialize_order(self):
"""
Initialize order state with empty values.
State before: {}
State after: {order_id: str, items: [], status: 'new'}
"""
self.state.order_id = str(uuid.uuid4())
self.state.items = []
self.state.status = "new"
return "Order initialized"
```
### 4. Trate Erros de Estado de Forma Elegante
@@ -280,7 +667,18 @@ Para flows complexos, documente como o estado muda ao longo da execução:
Implemente tratamento de erros ao acessar o estado:
```python
# Exemplo não traduzido
@listen(previous_step)
def process_data(self, _):
try:
# Tenta acessar um valor que pode não existir
user_preference = self.state.preferences.get("theme", "default")
except (AttributeError, KeyError):
# Trata o erro de forma elegante
self.state.errors = self.state.get("errors", [])
self.state.errors.append("Failed to access preferences")
user_preference = "default"
return f"Used preference: {user_preference}"
```
### 5. Use o Estado Para Acompanhar o Progresso
@@ -288,7 +686,30 @@ Implemente tratamento de erros ao acessar o estado:
Aproveite o estado para monitorar o progresso em flows de longa duração:
```python
# Exemplo não traduzido
class ProgressTrackingFlow(Flow):
@start()
def initialize(self):
self.state["total_steps"] = 3
self.state["current_step"] = 0
self.state["progress"] = 0.0
self.update_progress()
return "Initialized"
def update_progress(self):
"""Helper method to calculate and update progress"""
if self.state.get("total_steps", 0) > 0:
self.state["progress"] = (self.state.get("current_step", 0) /
self.state["total_steps"]) * 100
print(f"Progress: {self.state['progress']:.1f}%")
@listen(initialize)
def step_one(self, _):
# Realiza o trabalho...
self.state["current_step"] = 1
self.update_progress()
return "Step 1 complete"
# Etapas adicionais...
```
### 6. Prefira Operações Imutáveis Quando Possível
@@ -296,7 +717,22 @@ Aproveite o estado para monitorar o progresso em flows de longa duração:
Especialmente com estado estruturado, prefira operações imutáveis para maior clareza:
```python
# Exemplo não traduzido
# Em vez de modificar listas no local:
self.state.items.append(new_item) # Operação mutável
# Considere criar um novo estado:
from pydantic import BaseModel
from typing import List
class ItemState(BaseModel):
items: List[str] = []
class ImmutableFlow(Flow[ItemState]):
@start()
def add_item(self):
# Cria uma nova lista com o item adicionado
self.state.items = [*self.state.items, "new item"]
return "Item added"
```
## Depurando o Estado do Flow
@@ -306,7 +742,24 @@ Especialmente com estado estruturado, prefira operações imutáveis para maior
Ao desenvolver, adicione logs para acompanhar mudanças no estado:
```python
# Exemplo não traduzido
import logging
logging.basicConfig(level=logging.INFO)
class LoggingFlow(Flow):
def log_state(self, step_name):
logging.info(f"State after {step_name}: {self.state}")
@start()
def initialize(self):
self.state["counter"] = 0
self.log_state("initialize")
return "Initialized"
@listen(initialize)
def increment(self, _):
self.state["counter"] += 1
self.log_state("increment")
return f"Incremented to {self.state['counter']}"
```
### Visualizando o Estado
@@ -314,7 +767,30 @@ Ao desenvolver, adicione logs para acompanhar mudanças no estado:
Você pode adicionar métodos para visualizar seu estado durante o debug:
```python
# Exemplo não traduzido
def visualize_state(self):
"""Create a simple visualization of the current state"""
import json
from rich.console import Console
from rich.panel import Panel
console = Console()
if hasattr(self.state, "model_dump"):
# Pydantic v2
state_dict = self.state.model_dump()
elif hasattr(self.state, "dict"):
# Pydantic v1
state_dict = self.state.dict()
else:
# Estado não estruturado
state_dict = dict(self.state)
# Remove o id para uma saída mais limpa
if "id" in state_dict:
state_dict.pop("id")
state_json = json.dumps(state_dict, indent=2, default=str)
console.print(Panel(state_json, title="Current Flow State"))
```
## Conclusão

View File

@@ -1,20 +1,23 @@
"""String templates for A2A (Agent-to-Agent) delegation prompts."""
"""String templates for A2A (Agent-to-Agent) protocol messaging and status."""
from string import Template
from typing import Final
AVAILABLE_AGENTS_TEMPLATE: Final[Template] = Template(
"\n<AVAILABLE_A2A_AGENTS>\n"
"You can delegate to remote agents using the delegate_to_* tools below. "
"Each tool's description lists the remote agent's capabilities — call the "
"tool whose capabilities best match the task. Pass the question or sub-task "
"to the remote agent via the tool's `message` argument; the tool returns "
"the remote agent's response, which you should incorporate into your final "
"answer. If the available agents are not a good fit, answer directly "
"without calling a delegation tool.\n\n"
" $available_a2a_agents"
"\n</AVAILABLE_A2A_AGENTS>\n"
"\n<AVAILABLE_A2A_AGENTS>\n $available_a2a_agents\n</AVAILABLE_A2A_AGENTS>\n"
)
PREVIOUS_A2A_CONVERSATION_TEMPLATE: Final[Template] = Template(
"\n<PREVIOUS_A2A_CONVERSATION>\n"
" $previous_a2a_conversation"
"\n</PREVIOUS_A2A_CONVERSATION>\n"
)
CONVERSATION_TURN_INFO_TEMPLATE: Final[Template] = Template(
"\n<CONVERSATION_PROGRESS>\n"
' turn="$turn_count"\n'
' max_turns="$max_turns"\n'
" $warning"
"\n</CONVERSATION_PROGRESS>\n"
)
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
"\n<A2A_AGENTS_STATUS>\n"
@@ -24,3 +27,29 @@ UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
" $unavailable_agents"
"\n</A2A_AGENTS_STATUS>\n"
)
REMOTE_AGENT_COMPLETED_NOTICE: Final[str] = """
<REMOTE_AGENT_STATUS>
STATUS: COMPLETED
The remote agent has finished processing your request. Their response is in the conversation history above.
You MUST now:
1. Extract the answer from the conversation history
2. Set is_a2a=false
3. Return the answer as your final message
DO NOT send another request - the task is already done.
</REMOTE_AGENT_STATUS>
"""
REMOTE_AGENT_RESPONSE_NOTICE: Final[str] = """
<REMOTE_AGENT_STATUS>
STATUS: RESPONSE_RECEIVED
The remote agent has responded. Their response is in the conversation history above.
You MUST now:
1. Set is_a2a=false (the remote task is complete and cannot receive more messages)
2. Provide YOUR OWN response to the original task based on the information received
IMPORTANT: Your response should be addressed to the USER who gave you the original task.
Report what the remote agent told you in THIRD PERSON (e.g., "The remote agent said..." or "I learned that...").
Do NOT address the remote agent directly or use "you" to refer to them.
</REMOTE_AGENT_STATUS>
"""

View File

@@ -1,394 +0,0 @@
"""Tool-based A2A delegation.
Each remote A2A agent is exposed to the local LLM as a BaseTool. The local
agent's normal tool-call loop drives multi-turn delegation: each tool call is
one turn against the remote agent. Per-endpoint conversation state lives in
``A2ADelegationState`` and is shared across the tools built for a single task.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from a2a.types import Role, TaskState
from pydantic import BaseModel, Field, PrivateAttr
from crewai.a2a.config import A2AClientConfig, A2AConfig
from crewai.a2a.extensions.base import (
A2AExtension,
ConversationState,
ExtensionRegistry,
)
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.utils.delegation import aexecute_a2a_delegation, execute_a2a_delegation
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import A2AConversationCompletedEvent
from crewai.tools.base_tool import BaseTool
from crewai.utilities.string_utils import sanitize_tool_name
if TYPE_CHECKING:
from a2a.types import AgentCard, Message
from crewai.task import Task
_DELEGATE_PREFIX = "delegate_to_"
@dataclass
class _EndpointState:
"""Mutable per-endpoint conversation state across tool calls."""
conversation_history: list[Message] = field(default_factory=list)
context_id: str | None = None
task_id: str | None = None
reference_task_ids: list[str] = field(default_factory=list)
turn_count: int = 0
@dataclass
class A2ADelegationState:
"""State shared across all A2A delegation tools for a single task execution."""
agent: Any
task: Task
extension_registry: ExtensionRegistry | None = None
_per_endpoint: dict[str, _EndpointState] = field(default_factory=dict)
def _state_for(self, endpoint: str) -> _EndpointState:
return self._per_endpoint.setdefault(endpoint, _EndpointState())
def _initial_ids_from_task(self, state: _EndpointState) -> None:
if state.turn_count > 0:
return
task_config = self.task.config or {}
if state.context_id is None:
state.context_id = task_config.get("context_id")
if state.task_id is None:
state.task_id = task_config.get("task_id")
if not state.reference_task_ids:
state.reference_task_ids = list(task_config.get("reference_task_ids", []))
def delegate(
self,
config: A2AConfig | A2AClientConfig,
agent_card: AgentCard | None,
message: str,
) -> str:
"""Run one delegation turn against ``config.endpoint``.
Returns the remote agent's response text, suitable for handing back to
the local LLM as a tool result.
"""
return _run_delegation(self, config, agent_card, message, sync=True)
async def adelegate(
self,
config: A2AConfig | A2AClientConfig,
agent_card: AgentCard | None,
message: str,
) -> str:
"""Async variant of :meth:`delegate`."""
return await _run_delegation_async(self, config, agent_card, message)
class _A2ADelegationArgs(BaseModel):
"""Argument schema for A2A delegation tools."""
message: str = Field(
...,
description=(
"The question or task to send to the remote agent. Be specific and "
"self-contained: the remote agent does not see your other tools or "
"your prior reasoning."
),
)
class A2ADelegationTool(BaseTool):
"""BaseTool that delegates one turn of conversation to a remote A2A agent.
Each instance is bound to a specific A2A endpoint via ``_config``. Calling
``_run`` or ``_arun`` advances that endpoint's conversation by one turn and
returns the remote agent's response text.
"""
args_schema: type[BaseModel] = _A2ADelegationArgs
_config: A2AConfig | A2AClientConfig = PrivateAttr()
_agent_card: AgentCard | None = PrivateAttr(default=None)
_state: A2ADelegationState = PrivateAttr()
def _run(self, message: str) -> str:
return self._state.delegate(self._config, self._agent_card, message)
async def _arun(self, message: str) -> str:
return await self._state.adelegate(self._config, self._agent_card, message)
def build_a2a_tools(
a2a_agents: list[A2AConfig | A2AClientConfig],
agent_cards: dict[str, AgentCard],
state: A2ADelegationState,
) -> list[BaseTool]:
"""Build one ``A2ADelegationTool`` per available A2A agent.
Tool names collide-disambiguate with a numeric suffix; agents whose cards
failed to fetch are skipped.
"""
tools: list[BaseTool] = []
used_names: set[str] = set()
for config in a2a_agents:
card = agent_cards.get(config.endpoint)
if card is None:
continue
name = _build_tool_name(card.name or "remote_agent", used_names)
used_names.add(name)
tool = A2ADelegationTool(
name=name,
description=_build_tool_description(card),
max_usage_count=config.max_turns,
)
tool._config = config
tool._agent_card = card
tool._state = state
tools.append(tool)
return tools
def _build_tool_name(card_name: str, used: set[str]) -> str:
base = sanitize_tool_name(f"{_DELEGATE_PREFIX}{card_name}")
if base not in used:
return base
for i in range(2, 1000):
candidate = sanitize_tool_name(f"{base}_{i}")
if candidate not in used:
return candidate
raise ValueError(f"Could not generate unique tool name for {card_name!r}")
def _build_tool_description(card: AgentCard) -> str:
lines: list[str] = [f"Delegate a task to the remote A2A agent {card.name!r}."]
if card.description:
lines.append(card.description.strip())
if card.skills:
skill_names = ", ".join(s.name for s in card.skills if s.name)
if skill_names:
lines.append(f"Capabilities: {skill_names}.")
lines.append(
"Use this tool only when the question matches the agent's capabilities. "
"After receiving a response, prefer answering directly unless you need "
"another round-trip."
)
return "\n".join(lines)
def _run_delegation(
state: A2ADelegationState,
config: A2AConfig | A2AClientConfig,
agent_card: AgentCard | None,
message: str,
*,
sync: bool,
) -> str:
endpoint_state = state._state_for(config.endpoint)
state._initial_ids_from_task(endpoint_state)
extension_states = _extract_extension_states(state, endpoint_state)
metadata = _merged_metadata(state, endpoint_state, extension_states)
agent_branch, accepted_output_modes = _turn_context(config)
a2a_result = execute_a2a_delegation(
endpoint=config.endpoint,
auth=config.auth,
timeout=config.timeout,
task_description=message,
context_id=endpoint_state.context_id,
task_id=endpoint_state.task_id,
reference_task_ids=endpoint_state.reference_task_ids,
metadata=metadata or None,
extensions=(state.task.config or {}).get("extensions"),
conversation_history=endpoint_state.conversation_history,
agent_id=config.endpoint,
agent_role=Role.user,
agent_branch=agent_branch,
response_model=config.response_model,
turn_number=endpoint_state.turn_count + 1,
updates=config.updates,
transport=config.transport,
from_task=state.task,
from_agent=state.agent,
client_extensions=getattr(config, "extensions", None),
accepted_output_modes=accepted_output_modes,
input_files=state.task.input_files,
)
return _finalize_turn(
state, endpoint_state, config, agent_card, a2a_result, extension_states
)
async def _run_delegation_async(
state: A2ADelegationState,
config: A2AConfig | A2AClientConfig,
agent_card: AgentCard | None,
message: str,
) -> str:
endpoint_state = state._state_for(config.endpoint)
state._initial_ids_from_task(endpoint_state)
extension_states = _extract_extension_states(state, endpoint_state)
metadata = _merged_metadata(state, endpoint_state, extension_states)
agent_branch, accepted_output_modes = _turn_context(config)
a2a_result = await aexecute_a2a_delegation(
endpoint=config.endpoint,
auth=config.auth,
timeout=config.timeout,
task_description=message,
context_id=endpoint_state.context_id,
task_id=endpoint_state.task_id,
reference_task_ids=endpoint_state.reference_task_ids,
metadata=metadata or None,
extensions=(state.task.config or {}).get("extensions"),
conversation_history=endpoint_state.conversation_history,
agent_id=config.endpoint,
agent_role=Role.user,
agent_branch=agent_branch,
response_model=config.response_model,
turn_number=endpoint_state.turn_count + 1,
updates=config.updates,
transport=config.transport,
from_task=state.task,
from_agent=state.agent,
client_extensions=getattr(config, "extensions", None),
accepted_output_modes=accepted_output_modes,
input_files=state.task.input_files,
)
return _finalize_turn(
state, endpoint_state, config, agent_card, a2a_result, extension_states
)
def _extract_extension_states(
state: A2ADelegationState,
endpoint_state: _EndpointState,
) -> dict[type[A2AExtension], ConversationState]:
if state.extension_registry and endpoint_state.conversation_history:
return state.extension_registry.extract_all_states(
endpoint_state.conversation_history
)
return {}
def _merged_metadata(
state: A2ADelegationState,
endpoint_state: _EndpointState,
extension_states: dict[type[A2AExtension], ConversationState],
) -> dict[str, Any]:
task_config = state.task.config or {}
metadata: dict[str, Any] = dict(task_config.get("metadata") or {})
if state.extension_registry and extension_states:
metadata.update(state.extension_registry.prepare_all_metadata(extension_states))
return metadata
def _turn_context(
config: A2AConfig | A2AClientConfig,
) -> tuple[Any | None, list[str] | None]:
console_formatter = getattr(crewai_event_bus, "_console", None)
agent_branch = None
if console_formatter:
agent_branch = getattr(
console_formatter, "current_agent_branch", None
) or getattr(console_formatter, "current_task_branch", None)
accepted_output_modes = None
if isinstance(config, A2AClientConfig):
accepted_output_modes = config.accepted_output_modes
return agent_branch, accepted_output_modes
def _finalize_turn(
state: A2ADelegationState,
endpoint_state: _EndpointState,
config: A2AConfig | A2AClientConfig,
agent_card: AgentCard | None,
a2a_result: TaskStateResult,
extension_states: dict[type[A2AExtension], ConversationState],
) -> str:
endpoint_state.conversation_history = list(a2a_result.get("history", []))
if endpoint_state.conversation_history:
latest = endpoint_state.conversation_history[-1]
if latest.task_id is not None:
endpoint_state.task_id = latest.task_id
if latest.context_id is not None:
endpoint_state.context_id = latest.context_id
endpoint_state.turn_count += 1
status = a2a_result.get("status")
if status == TaskState.completed:
if (
endpoint_state.task_id is not None
and endpoint_state.task_id not in endpoint_state.reference_task_ids
):
endpoint_state.reference_task_ids.append(endpoint_state.task_id)
if state.task.config is None:
state.task.config = {}
state.task.config["reference_task_ids"] = list(
endpoint_state.reference_task_ids
)
endpoint_state.task_id = None
result_text = str(a2a_result.get("result", ""))
crewai_event_bus.emit(
None,
A2AConversationCompletedEvent(
status="completed",
final_result=result_text,
error=None,
total_turns=endpoint_state.turn_count,
from_task=state.task,
from_agent=state.agent,
endpoint=config.endpoint,
a2a_agent_name=agent_card.name if agent_card else None,
agent_card=agent_card.model_dump() if agent_card else None,
),
)
return _apply_response_extensions(state, result_text, extension_states)
if status == TaskState.input_required:
result_text = str(a2a_result.get("result", ""))
return _apply_response_extensions(state, result_text, extension_states)
error_msg = a2a_result.get("error", "Unknown error")
crewai_event_bus.emit(
None,
A2AConversationCompletedEvent(
status="failed",
final_result=None,
error=error_msg,
total_turns=endpoint_state.turn_count,
from_task=state.task,
from_agent=state.agent,
endpoint=config.endpoint,
a2a_agent_name=agent_card.name if agent_card else None,
agent_card=agent_card.model_dump() if agent_card else None,
),
)
return f"Remote agent error: {error_msg}"
def _apply_response_extensions(
state: A2ADelegationState,
response_text: str,
extension_states: dict[type[A2AExtension], ConversationState],
) -> str:
if not state.extension_registry:
return response_text
processed = state.extension_registry.process_response_with_all(
response_text, extension_states
)
return processed if isinstance(processed, str) else str(processed)

View File

@@ -6,6 +6,8 @@ from typing import (
Annotated,
Any,
Literal,
Protocol,
runtime_checkable,
)
from pydantic import BeforeValidator, HttpUrl, TypeAdapter
@@ -55,6 +57,15 @@ Url = Annotated[
]
@runtime_checkable
class AgentResponseProtocol(Protocol):
"""Protocol for the dynamically created AgentResponse model."""
a2a_ids: tuple[str, ...]
message: str
is_a2a: bool
class PartsMetadataDict(TypedDict, total=False):
"""Metadata for A2A message parts.

View File

@@ -1,25 +1,75 @@
"""Helpers for extracting A2A client configurations."""
"""Response model utilities for A2A agent interactions."""
from __future__ import annotations
from typing import TypeAlias
from pydantic import BaseModel, Field, create_model
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
from crewai.types.utils import create_literals_from_strings
A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig
A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig
def extract_a2a_client_configs(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
) -> list[A2AClientConfigTypes]:
"""Return the client-side A2A configs from a possibly-mixed config list.
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel] | None:
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
Filters out :class:`A2AServerConfig`, which has no endpoint to delegate to.
Args:
agent_ids: List of available A2A agent IDs.
Returns:
Dynamically created Pydantic model with Literal-constrained a2a_ids field,
or None if agent_ids is empty.
"""
if not agent_ids:
return None
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
return create_model(
"AgentResponse",
a2a_ids=(
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
Field(
default_factory=tuple,
max_length=len(agent_ids),
description="A2A agent IDs to delegate to.",
),
),
message=(
str,
Field(
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
),
),
is_a2a=(
bool,
Field(
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
),
),
__base__=BaseModel,
)
def extract_a2a_agent_ids_from_config(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]:
"""Extract A2A agent IDs from A2A configuration.
Filters out A2AServerConfig since it doesn't have an endpoint for delegation.
Args:
a2a_config: A2A configuration (any type).
Returns:
Tuple of client A2A configs list and agent endpoint IDs.
"""
if a2a_config is None:
return []
return [], ()
configs: list[A2AConfigTypes]
if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)):
@@ -27,6 +77,24 @@ def extract_a2a_client_configs(
else:
configs = a2a_config
return [
client_configs: list[A2AClientConfigTypes] = [
config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig))
]
return client_configs, tuple(config.endpoint for config in client_configs)
def get_a2a_agents_and_response_model(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
) -> tuple[list[A2AClientConfigTypes], type[BaseModel] | None]:
"""Get A2A agent configs and response model.
Args:
a2a_config: A2A configuration (any type).
Returns:
Tuple of client A2A configs and response model.
"""
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
return a2a_agents, create_agent_response_model(agent_ids)

File diff suppressed because it is too large Load Diff

View File

@@ -111,6 +111,12 @@ from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
try:
from crewai.a2a.types import AgentResponseProtocol
except ImportError:
AgentResponseProtocol = None # type: ignore[assignment, misc]
if TYPE_CHECKING:
from crewai_files import FileInput
@@ -626,7 +632,15 @@ class Agent(BaseAgent):
result = process_tool_results(self, result)
output_for_event = result if isinstance(result, str) else str(result)
output_for_event = result
if (
AgentResponseProtocol is not None
and isinstance(result, BaseModel)
and isinstance(result, AgentResponseProtocol)
):
output_for_event = str(result.message)
elif not isinstance(result, str):
output_for_event = str(result)
crewai_event_bus.emit(
self,

View File

@@ -121,11 +121,11 @@ def _kickoff_with_a2a_support(
Returns:
LiteAgentOutput from either local execution or A2A delegation.
"""
from crewai.a2a.utils.response_model import extract_a2a_client_configs
from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model
from crewai.a2a.wrapper import _execute_task_with_a2a
from crewai.task import Task
a2a_agents = extract_a2a_client_configs(agent.a2a)
a2a_agents, agent_response_model = get_a2a_agents_and_response_model(agent.a2a)
if not a2a_agents:
return original_kickoff(messages, response_format, input_files)
@@ -160,6 +160,7 @@ def _kickoff_with_a2a_support(
a2a_agents=a2a_agents,
original_fn=task_to_kickoff_adapter,
task=fake_task,
agent_response_model=agent_response_model,
context=None,
tools=None,
extension_registry=extension_registry,

View File

@@ -1,14 +1,13 @@
"""Tests for A2A delegation tool behavior, including trust_remote_completion_status."""
"""Test trust_remote_completion_status flag in A2A wrapper."""
from unittest.mock import MagicMock, patch
import pytest
from crewai.a2a.config import A2AClientConfig, A2AConfig
from crewai.a2a.config import A2AConfig
try:
from a2a.types import TaskState # noqa: F401
from a2a.types import Message, Role
A2A_SDK_INSTALLED = True
except ImportError:
@@ -16,126 +15,141 @@ except ImportError:
def _create_mock_agent_card(name: str = "Test", url: str = "http://test-endpoint.com/"):
"""Create a mock agent card with the attributes A2ADelegationTool reads."""
"""Create a mock agent card with proper model_dump behavior."""
mock_card = MagicMock()
mock_card.name = name
mock_card.url = url
mock_card.description = "A test agent"
mock_card.skills = []
mock_card.model_dump.return_value = {"name": name, "url": url}
mock_card.model_dump_json.return_value = f'{{"name": "{name}", "url": "{url}"}}'
return mock_card
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_delegation_tool_returns_remote_result_on_completion():
"""A successful remote completion is returned to the local LLM as the tool result."""
from a2a.types import TaskState
def test_trust_remote_completion_status_true_returns_directly():
"""When trust_remote_completion_status=True and A2A returns completed, return result directly."""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai.a2a.types import AgentResponseProtocol
from crewai import Agent, Task
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
config = A2AClientConfig(endpoint="http://test-endpoint.com")
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=True,
)
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)
task = Task(description="test", expected_output="test", agent=agent)
card = _create_mock_agent_card()
state = A2ADelegationState(agent=agent, task=task)
tools = build_a2a_tools([config], {config.endpoint: card}, state)
assert len(tools) == 1
tool = tools[0]
class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
with (
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = _create_mock_agent_card()
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
# A2A returns completed
mock_execute.return_value = {
"status": TaskState.completed,
"status": "completed",
"result": "Done by remote",
"history": [],
}
result = tool._run(message="Please help")
assert result == "Done by remote"
assert mock_execute.call_count == 1
# This should return directly without checking LLM response
result = _delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
assert result == "Done by remote"
assert mock_execute.call_count == 1
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_delegation_tool_records_completed_task_in_references():
"""When a remote task completes with a task_id, it goes into reference_task_ids."""
from a2a.types import TaskState
def test_trust_remote_completion_status_false_continues_conversation():
"""When trust_remote_completion_status=False and A2A returns completed, ask server agent."""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai import Agent, Task
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
config = A2AClientConfig(endpoint="http://test-endpoint.com")
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=False,
)
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)
task = Task(description="test", expected_output="test", agent=agent)
card = _create_mock_agent_card()
state = A2ADelegationState(agent=agent, task=task)
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]
history_msg = MagicMock()
history_msg.task_id = "remote-task-1"
history_msg.context_id = "ctx-1"
call_count = 0
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
def mock_original_fn(self, task, context, tools):
nonlocal call_count
call_count += 1
if call_count == 1:
# Server decides to finish
return '{"is_a2a": false, "message": "Server final answer", "a2a_ids": []}'
return "unexpected"
with (
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = _create_mock_agent_card()
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
# A2A returns completed
mock_execute.return_value = {
"status": TaskState.completed,
"result": "Done",
"history": [history_msg],
}
tool._run(message="Please help")
endpoint_state = state._per_endpoint[config.endpoint]
assert "remote-task-1" in endpoint_state.reference_task_ids
assert endpoint_state.task_id is None
assert task.config is not None
assert task.config["reference_task_ids"] == ["remote-task-1"]
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_delegation_tool_returns_error_message_on_failure():
"""A non-completed/non-input-required status surfaces as a readable error string."""
from a2a.types import TaskState
from crewai import Agent, Task
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
config = A2AClientConfig(endpoint="http://test-endpoint.com")
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
task = Task(description="test", expected_output="test", agent=agent)
card = _create_mock_agent_card()
state = A2ADelegationState(agent=agent, task=task)
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
mock_execute.return_value = {
"status": TaskState.failed,
"error": "remote agent unreachable",
"status": "completed",
"result": "Done by remote",
"history": [],
}
result = tool._run(message="Please help")
assert "remote agent unreachable" in result
result = _delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=mock_original_fn,
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
# Should call original_fn to get server response
assert call_count >= 1
assert result == "Server final answer"
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_delegation_tool_respects_max_turns_via_usage_count():
"""A2AConfig.max_turns wires through to BaseTool.max_usage_count."""
from crewai import Agent, Task
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
config = A2AClientConfig(endpoint="http://test-endpoint.com", max_turns=2)
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
task = Task(description="test", expected_output="test", agent=agent)
card = _create_mock_agent_card()
state = A2ADelegationState(agent=agent, task=task)
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
assert tool.max_usage_count == 2
def test_default_trust_remote_completion_status_is_false():
"""Verify that default value of trust_remote_completion_status is False."""
a2a_config = A2AConfig(endpoint="http://test-endpoint.com")
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
)
assert a2a_config.trust_remote_completion_status is False