mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-15 05:58:10 +00:00
Compare commits
3 Commits
gl/refacto
...
docs/custo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
71c7293197 | ||
|
|
288568110f | ||
|
|
c36827b45b |
@@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start
|
||||
from dotenv import load_dotenv
|
||||
from litellm import completion
|
||||
|
||||
load_dotenv()
|
||||
|
||||
class ExampleFlow(Flow):
|
||||
model = "gpt-4o-mini"
|
||||
|
||||
2445
docs/docs.json
2445
docs/docs.json
File diff suppressed because it is too large
Load Diff
@@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start
|
||||
from dotenv import load_dotenv
|
||||
from litellm import completion
|
||||
|
||||
load_dotenv()
|
||||
|
||||
class ExampleFlow(Flow):
|
||||
model = "gpt-4o-mini"
|
||||
|
||||
@@ -313,9 +313,9 @@ flow1 = PersistentCounterFlow()
|
||||
result1 = flow1.kickoff()
|
||||
print(f"First run result: {result1}")
|
||||
|
||||
# Second run - state is automatically loaded
|
||||
# Second run - pass the ID to load the persisted state
|
||||
flow2 = PersistentCounterFlow()
|
||||
result2 = flow2.kickoff()
|
||||
result2 = flow2.kickoff(inputs={"id": flow1.state.id})
|
||||
print(f"Second run result: {result2}") # Will be higher due to persisted state
|
||||
```
|
||||
|
||||
|
||||
131
docs/en/guides/tools/platform-tools-cli.mdx
Normal file
131
docs/en/guides/tools/platform-tools-cli.mdx
Normal file
@@ -0,0 +1,131 @@
|
||||
---
|
||||
title: Platform Tools CLI
|
||||
description: Create, publish, and install custom tools on the CrewAI platform using the CLI.
|
||||
icon: terminal
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
The CrewAI CLI provides commands to manage custom tools on the **CrewAI platform** — a hosted tool registry that lets you share tools within your organization without publishing to PyPI.
|
||||
|
||||
| Command | Purpose |
|
||||
|---------|---------|
|
||||
| `crewai tool create <handle>` | Scaffold a new tool project |
|
||||
| `crewai tool publish` | Publish the tool to the CrewAI platform |
|
||||
| `crewai tool install <handle>` | Install a platform tool into your crew project |
|
||||
|
||||
<Note type="info" title="Platform vs PyPI">
|
||||
These commands manage tools on the **CrewAI platform registry**. If you want to publish a standalone Python package to PyPI instead, see the [Publish Custom Tools to PyPI](/en/guides/tools/publish-custom-tools) guide.
|
||||
</Note>
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- **CrewAI CLI** installed (`pip install crewai`)
|
||||
- **Authenticated** with the platform — run `crewai login` first
|
||||
|
||||
---
|
||||
|
||||
## Step 1: Create a Tool Project
|
||||
|
||||
Scaffold a new tool project:
|
||||
|
||||
```bash
|
||||
crewai tool create my_custom_tool
|
||||
```
|
||||
|
||||
This generates a project structure with the boilerplate you need to start building your tool.
|
||||
|
||||
<Tip>
|
||||
The `handle` is the unique identifier for your tool on the platform. Choose something descriptive and specific to what the tool does.
|
||||
</Tip>
|
||||
|
||||
### Implement Your Tool
|
||||
|
||||
Edit the generated tool file to add your logic. The tool follows the standard CrewAI tools contract — you can subclass `BaseTool` or use the `@tool` decorator:
|
||||
|
||||
```python
|
||||
from crewai.tools import BaseTool
|
||||
|
||||
class MyCustomTool(BaseTool):
|
||||
name: str = "My Custom Tool"
|
||||
description: str = "Description of what this tool does — be specific so agents know when to use it."
|
||||
|
||||
def _run(self, argument: str) -> str:
|
||||
# Your tool logic here
|
||||
return "result"
|
||||
```
|
||||
|
||||
For the full tools API reference (input schemas, caching, async support, error handling), see the [Create Custom Tools](/en/learn/create-custom-tools) guide.
|
||||
|
||||
---
|
||||
|
||||
## Step 2: Publish to the Platform
|
||||
|
||||
From your tool project directory, publish it to the CrewAI platform:
|
||||
|
||||
```bash
|
||||
crewai tool publish
|
||||
```
|
||||
|
||||
### Options
|
||||
|
||||
| Flag | Description |
|
||||
|------|-------------|
|
||||
| `--force` | Bypass Git remote validations |
|
||||
|
||||
Tools are published privately to your organization by default.
|
||||
|
||||
---
|
||||
|
||||
## Step 3: Install a Platform Tool
|
||||
|
||||
To install a tool that's been published to the platform:
|
||||
|
||||
```bash
|
||||
crewai tool install my_custom_tool
|
||||
```
|
||||
|
||||
Once installed, you can use the tool in your crew like any other tool — assign it to an agent via the `tools` parameter.
|
||||
|
||||
---
|
||||
|
||||
## Full Lifecycle Example
|
||||
|
||||
```bash
|
||||
# 1. Authenticate with the platform
|
||||
crewai login
|
||||
|
||||
# 2. Scaffold a new tool
|
||||
crewai tool create weather_lookup
|
||||
|
||||
# 3. Implement your logic in the generated project
|
||||
cd weather_lookup
|
||||
# ... edit the tool file ...
|
||||
|
||||
# 4. Publish to the platform
|
||||
crewai tool publish
|
||||
|
||||
# 5. In another project, install and use it
|
||||
crewai tool install weather_lookup
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Platform Tools vs PyPI Packages
|
||||
|
||||
| | Platform Tools | PyPI Packages |
|
||||
|---|---|---|
|
||||
| **Publish** | `crewai tool publish` | `uv build` + `uv publish` |
|
||||
| **Registry** | CrewAI platform | PyPI |
|
||||
| **Install** | `crewai tool install <handle>` | `pip install <package>` |
|
||||
| **Auth** | `crewai login` | PyPI account + token |
|
||||
| **Visibility** | Organization-scoped (private) | Always public |
|
||||
| **Guide** | This page | [Publish Custom Tools](/en/guides/tools/publish-custom-tools) |
|
||||
|
||||
---
|
||||
|
||||
## Related
|
||||
|
||||
- [Create Custom Tools](/en/learn/create-custom-tools) — Python API reference for building tools (BaseTool, @tool decorator)
|
||||
- [Publish Custom Tools to PyPI](/en/guides/tools/publish-custom-tools) — package and distribute tools as standalone Python libraries
|
||||
@@ -12,7 +12,9 @@ incorporating the latest functionalities such as tool delegation, error handling
|
||||
enabling agents to perform a wide range of actions.
|
||||
|
||||
<Tip>
|
||||
**Want to publish your tool for the community?** If you're building a tool that others could benefit from, check out the [Publish Custom Tools](/en/guides/tools/publish-custom-tools) guide to learn how to package and distribute your tool on PyPI.
|
||||
**Want to publish your tool to the CrewAI platform?** Use the CLI to scaffold, publish, and share tools directly on the platform — see the [Platform Tools CLI](/en/guides/tools/platform-tools-cli) guide.
|
||||
|
||||
**Prefer publishing to PyPI?** Check out the [Publish Custom Tools](/en/guides/tools/publish-custom-tools) guide to package and distribute your tool as a standalone Python library.
|
||||
</Tip>
|
||||
|
||||
### Subclassing `BaseTool`
|
||||
|
||||
@@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start
|
||||
from dotenv import load_dotenv
|
||||
from litellm import completion
|
||||
|
||||
load_dotenv()
|
||||
|
||||
class ExampleFlow(Flow):
|
||||
model = "gpt-4o-mini"
|
||||
|
||||
@@ -24,7 +24,63 @@ Os flows permitem que você crie fluxos de trabalho estruturados e orientados po
|
||||
Vamos criar um Flow simples no qual você usará a OpenAI para gerar uma cidade aleatória em uma tarefa e, em seguida, usará essa cidade para gerar uma curiosidade em outra tarefa.
|
||||
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from dotenv import load_dotenv
|
||||
from litellm import completion
|
||||
|
||||
load_dotenv()
|
||||
|
||||
class ExampleFlow(Flow):
|
||||
model = "gpt-4o-mini"
|
||||
|
||||
@start()
|
||||
def generate_city(self):
|
||||
print("Starting flow")
|
||||
# Cada estado do flow recebe automaticamente um ID único
|
||||
print(f"Flow State ID: {self.state['id']}")
|
||||
|
||||
response = completion(
|
||||
model=self.model,
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Return the name of a random city in the world.",
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
random_city = response["choices"][0]["message"]["content"]
|
||||
# Armazena a cidade no nosso estado
|
||||
self.state["city"] = random_city
|
||||
print(f"Random City: {random_city}")
|
||||
|
||||
return random_city
|
||||
|
||||
@listen(generate_city)
|
||||
def generate_fun_fact(self, random_city):
|
||||
response = completion(
|
||||
model=self.model,
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": f"Tell me a fun fact about {random_city}",
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
fun_fact = response["choices"][0]["message"]["content"]
|
||||
# Armazena a curiosidade no nosso estado
|
||||
self.state["fun_fact"] = fun_fact
|
||||
return fun_fact
|
||||
|
||||
|
||||
|
||||
flow = ExampleFlow()
|
||||
flow.plot()
|
||||
result = flow.kickoff()
|
||||
|
||||
print(f"Generated fun fact: {result}")
|
||||
```
|
||||
|
||||
Na ilustração acima, criamos um Flow simples que gera uma cidade aleatória usando a OpenAI e depois cria uma curiosidade sobre essa cidade. O Flow consiste em duas tarefas: `generate_city` e `generate_fun_fact`. A tarefa `generate_city` é o ponto de início do Flow, enquanto a tarefa `generate_fun_fact` fica escutando o resultado da tarefa `generate_city`.
|
||||
@@ -56,12 +112,16 @@ O decorador `@listen()` pode ser usado de várias formas:
|
||||
1. **Escutando um Método pelo Nome**: Você pode passar o nome do método ao qual deseja escutar como string. Quando esse método concluir, o método ouvinte será chamado.
|
||||
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
@listen("generate_city")
|
||||
def generate_fun_fact(self, random_city):
|
||||
# Implementação
|
||||
```
|
||||
|
||||
2. **Escutando um Método Diretamente**: Você pode passar o próprio método. Quando esse método concluir, o método ouvinte será chamado.
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
@listen(generate_city)
|
||||
def generate_fun_fact(self, random_city):
|
||||
# Implementação
|
||||
```
|
||||
|
||||
### Saída de um Flow
|
||||
@@ -76,7 +136,24 @@ Veja como acessar a saída final:
|
||||
|
||||
<CodeGroup>
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
class OutputExampleFlow(Flow):
|
||||
@start()
|
||||
def first_method(self):
|
||||
return "Output from first_method"
|
||||
|
||||
@listen(first_method)
|
||||
def second_method(self, first_output):
|
||||
return f"Second method received: {first_output}"
|
||||
|
||||
|
||||
flow = OutputExampleFlow()
|
||||
flow.plot("my_flow_plot")
|
||||
final_output = flow.kickoff()
|
||||
|
||||
print("---- Final Output ----")
|
||||
print(final_output)
|
||||
```
|
||||
|
||||
```text Output
|
||||
@@ -97,8 +174,34 @@ Além de recuperar a saída final, você pode acessar e atualizar o estado dentr
|
||||
Veja um exemplo de como atualizar e acessar o estado:
|
||||
|
||||
<CodeGroup>
|
||||
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from pydantic import BaseModel
|
||||
|
||||
class ExampleState(BaseModel):
|
||||
counter: int = 0
|
||||
message: str = ""
|
||||
|
||||
class StateExampleFlow(Flow[ExampleState]):
|
||||
|
||||
@start()
|
||||
def first_method(self):
|
||||
self.state.message = "Hello from first_method"
|
||||
self.state.counter += 1
|
||||
|
||||
@listen(first_method)
|
||||
def second_method(self):
|
||||
self.state.message += " - updated by second_method"
|
||||
self.state.counter += 1
|
||||
return self.state.message
|
||||
|
||||
flow = StateExampleFlow()
|
||||
flow.plot("my_flow_plot")
|
||||
final_output = flow.kickoff()
|
||||
print(f"Final Output: {final_output}")
|
||||
print("Final State:")
|
||||
print(flow.state)
|
||||
```
|
||||
|
||||
```text Output
|
||||
@@ -128,7 +231,33 @@ Essa abordagem oferece flexibilidade, permitindo que o desenvolvedor adicione ou
|
||||
Mesmo com estados não estruturados, os flows do CrewAI geram e mantêm automaticamente um identificador único (UUID) para cada instância de estado.
|
||||
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
class UnstructuredExampleFlow(Flow):
|
||||
|
||||
@start()
|
||||
def first_method(self):
|
||||
# O estado inclui automaticamente um campo 'id'
|
||||
print(f"State ID: {self.state['id']}")
|
||||
self.state['counter'] = 0
|
||||
self.state['message'] = "Hello from structured flow"
|
||||
|
||||
@listen(first_method)
|
||||
def second_method(self):
|
||||
self.state['counter'] += 1
|
||||
self.state['message'] += " - updated"
|
||||
|
||||
@listen(second_method)
|
||||
def third_method(self):
|
||||
self.state['counter'] += 1
|
||||
self.state['message'] += " - updated again"
|
||||
|
||||
print(f"State after third_method: {self.state}")
|
||||
|
||||
|
||||
flow = UnstructuredExampleFlow()
|
||||
flow.plot("my_flow_plot")
|
||||
flow.kickoff()
|
||||
```
|
||||
|
||||

|
||||
@@ -148,7 +277,39 @@ Ao usar modelos como o `BaseModel` da Pydantic, os desenvolvedores podem definir
|
||||
Cada estado nos flows do CrewAI recebe automaticamente um identificador único (UUID) para ajudar no rastreamento e gerenciamento. Esse ID é gerado e mantido automaticamente pelo sistema de flows.
|
||||
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ExampleState(BaseModel):
|
||||
# Nota: o campo 'id' é adicionado automaticamente a todos os estados
|
||||
counter: int = 0
|
||||
message: str = ""
|
||||
|
||||
|
||||
class StructuredExampleFlow(Flow[ExampleState]):
|
||||
|
||||
@start()
|
||||
def first_method(self):
|
||||
# Acesse o ID gerado automaticamente, se necessário
|
||||
print(f"State ID: {self.state.id}")
|
||||
self.state.message = "Hello from structured flow"
|
||||
|
||||
@listen(first_method)
|
||||
def second_method(self):
|
||||
self.state.counter += 1
|
||||
self.state.message += " - updated"
|
||||
|
||||
@listen(second_method)
|
||||
def third_method(self):
|
||||
self.state.counter += 1
|
||||
self.state.message += " - updated again"
|
||||
|
||||
print(f"State after third_method: {self.state}")
|
||||
|
||||
|
||||
flow = StructuredExampleFlow()
|
||||
flow.kickoff()
|
||||
```
|
||||
|
||||

|
||||
@@ -182,7 +343,19 @@ O decorador @persist permite a persistência automática do estado nos flows do
|
||||
Quando aplicado no nível da classe, o decorador @persist garante a persistência automática de todos os estados dos métodos do flow:
|
||||
|
||||
```python
|
||||
# (O código não é traduzido)
|
||||
@persist # Usa SQLiteFlowPersistence por padrão
|
||||
class MyFlow(Flow[MyState]):
|
||||
@start()
|
||||
def initialize_flow(self):
|
||||
# Este método terá seu estado persistido automaticamente
|
||||
self.state.counter = 1
|
||||
print("Initialized flow. State ID:", self.state.id)
|
||||
|
||||
@listen(initialize_flow)
|
||||
def next_step(self):
|
||||
# O estado (incluindo self.state.id) é recarregado automaticamente
|
||||
self.state.counter += 1
|
||||
print("Flow state is persisted. Counter:", self.state.counter)
|
||||
```
|
||||
|
||||
### Persistência no Nível de Método
|
||||
@@ -190,7 +363,14 @@ Quando aplicado no nível da classe, o decorador @persist garante a persistênci
|
||||
Para um controle mais granular, você pode aplicar @persist em métodos específicos:
|
||||
|
||||
```python
|
||||
# (O código não é traduzido)
|
||||
class AnotherFlow(Flow[dict]):
|
||||
@persist # Persiste apenas o estado deste método
|
||||
@start()
|
||||
def begin(self):
|
||||
if "runs" not in self.state:
|
||||
self.state["runs"] = 0
|
||||
self.state["runs"] += 1
|
||||
print("Method-level persisted runs:", self.state["runs"])
|
||||
```
|
||||
|
||||
### Forking de Estado Persistido
|
||||
@@ -282,8 +462,29 @@ A arquitetura de persistência enfatiza precisão técnica e opções de persona
|
||||
A função `or_` nos flows permite escutar múltiplos métodos e acionar o método ouvinte quando qualquer um dos métodos especificados gerar uma saída.
|
||||
|
||||
<CodeGroup>
|
||||
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
from crewai.flow.flow import Flow, listen, or_, start
|
||||
|
||||
class OrExampleFlow(Flow):
|
||||
|
||||
@start()
|
||||
def start_method(self):
|
||||
return "Hello from the start method"
|
||||
|
||||
@listen(start_method)
|
||||
def second_method(self):
|
||||
return "Hello from the second method"
|
||||
|
||||
@listen(or_(start_method, second_method))
|
||||
def logger(self, result):
|
||||
print(f"Logger: {result}")
|
||||
|
||||
|
||||
|
||||
flow = OrExampleFlow()
|
||||
flow.plot("my_flow_plot")
|
||||
flow.kickoff()
|
||||
```
|
||||
|
||||
```text Output
|
||||
@@ -302,8 +503,28 @@ A função `or_` serve para escutar vários métodos e disparar o método ouvint
|
||||
A função `and_` nos flows permite escutar múltiplos métodos e acionar o método ouvinte apenas quando todos os métodos especificados emitirem uma saída.
|
||||
|
||||
<CodeGroup>
|
||||
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
from crewai.flow.flow import Flow, and_, listen, start
|
||||
|
||||
class AndExampleFlow(Flow):
|
||||
|
||||
@start()
|
||||
def start_method(self):
|
||||
self.state["greeting"] = "Hello from the start method"
|
||||
|
||||
@listen(start_method)
|
||||
def second_method(self):
|
||||
self.state["joke"] = "What do computers eat? Microchips."
|
||||
|
||||
@listen(and_(start_method, second_method))
|
||||
def logger(self):
|
||||
print("---- Logger ----")
|
||||
print(self.state)
|
||||
|
||||
flow = AndExampleFlow()
|
||||
flow.plot()
|
||||
flow.kickoff()
|
||||
```
|
||||
|
||||
```text Output
|
||||
@@ -323,8 +544,42 @@ O decorador `@router()` nos flows permite definir lógica de roteamento condicio
|
||||
Você pode especificar diferentes rotas conforme a saída do método, permitindo controlar o fluxo de execução de forma dinâmica.
|
||||
|
||||
<CodeGroup>
|
||||
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
import random
|
||||
from crewai.flow.flow import Flow, listen, router, start
|
||||
from pydantic import BaseModel
|
||||
|
||||
class ExampleState(BaseModel):
|
||||
success_flag: bool = False
|
||||
|
||||
class RouterFlow(Flow[ExampleState]):
|
||||
|
||||
@start()
|
||||
def start_method(self):
|
||||
print("Starting the structured flow")
|
||||
random_boolean = random.choice([True, False])
|
||||
self.state.success_flag = random_boolean
|
||||
|
||||
@router(start_method)
|
||||
def second_method(self):
|
||||
if self.state.success_flag:
|
||||
return "success"
|
||||
else:
|
||||
return "failed"
|
||||
|
||||
@listen("success")
|
||||
def third_method(self):
|
||||
print("Third method running")
|
||||
|
||||
@listen("failed")
|
||||
def fourth_method(self):
|
||||
print("Fourth method running")
|
||||
|
||||
|
||||
flow = RouterFlow()
|
||||
flow.plot("my_flow_plot")
|
||||
flow.kickoff()
|
||||
```
|
||||
|
||||
```text Output
|
||||
@@ -401,7 +656,105 @@ Para um guia completo sobre feedback humano em flows, incluindo feedback assínc
|
||||
Os agentes podem ser integrados facilmente aos seus flows, oferecendo uma alternativa leve às crews completas quando você precisar executar tarefas simples e focadas. Veja um exemplo de como utilizar um agente em um flow para realizar uma pesquisa de mercado:
|
||||
|
||||
```python
|
||||
# (O código não é traduzido)
|
||||
import asyncio
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from crewai_tools import SerperDevTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
|
||||
# Define um formato de saída estruturado
|
||||
class MarketAnalysis(BaseModel):
|
||||
key_trends: List[str] = Field(description="List of identified market trends")
|
||||
market_size: str = Field(description="Estimated market size")
|
||||
competitors: List[str] = Field(description="Major competitors in the space")
|
||||
|
||||
|
||||
# Define o estado do flow
|
||||
class MarketResearchState(BaseModel):
|
||||
product: str = ""
|
||||
analysis: MarketAnalysis | None = None
|
||||
|
||||
|
||||
# Cria uma classe de flow
|
||||
class MarketResearchFlow(Flow[MarketResearchState]):
|
||||
@start()
|
||||
def initialize_research(self) -> Dict[str, Any]:
|
||||
print(f"Starting market research for {self.state.product}")
|
||||
return {"product": self.state.product}
|
||||
|
||||
@listen(initialize_research)
|
||||
async def analyze_market(self) -> Dict[str, Any]:
|
||||
# Cria um agente para pesquisa de mercado
|
||||
analyst = Agent(
|
||||
role="Market Research Analyst",
|
||||
goal=f"Analyze the market for {self.state.product}",
|
||||
backstory="You are an experienced market analyst with expertise in "
|
||||
"identifying market trends and opportunities.",
|
||||
tools=[SerperDevTool()],
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
# Define a consulta de pesquisa
|
||||
query = f"""
|
||||
Research the market for {self.state.product}. Include:
|
||||
1. Key market trends
|
||||
2. Market size
|
||||
3. Major competitors
|
||||
|
||||
Format your response according to the specified structure.
|
||||
"""
|
||||
|
||||
# Executa a análise com formato de saída estruturado
|
||||
result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
|
||||
if result.pydantic:
|
||||
print("result", result.pydantic)
|
||||
else:
|
||||
print("result", result)
|
||||
|
||||
# Retorna a análise para atualizar o estado
|
||||
return {"analysis": result.pydantic}
|
||||
|
||||
@listen(analyze_market)
|
||||
def present_results(self, analysis) -> None:
|
||||
print("\nMarket Analysis Results")
|
||||
print("=====================")
|
||||
|
||||
if isinstance(analysis, dict):
|
||||
# Se recebemos um dict com a chave 'analysis', extrai o objeto de análise real
|
||||
market_analysis = analysis.get("analysis")
|
||||
else:
|
||||
market_analysis = analysis
|
||||
|
||||
if market_analysis and isinstance(market_analysis, MarketAnalysis):
|
||||
print("\nKey Market Trends:")
|
||||
for trend in market_analysis.key_trends:
|
||||
print(f"- {trend}")
|
||||
|
||||
print(f"\nMarket Size: {market_analysis.market_size}")
|
||||
|
||||
print("\nMajor Competitors:")
|
||||
for competitor in market_analysis.competitors:
|
||||
print(f"- {competitor}")
|
||||
else:
|
||||
print("No structured analysis data available.")
|
||||
print("Raw analysis:", analysis)
|
||||
|
||||
|
||||
# Exemplo de uso
|
||||
async def run_flow():
|
||||
flow = MarketResearchFlow()
|
||||
flow.plot("MarketResearchFlowPlot")
|
||||
result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
|
||||
return result
|
||||
|
||||
|
||||
# Executa o flow
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run_flow())
|
||||
```
|
||||
|
||||

|
||||
@@ -463,7 +816,50 @@ No arquivo `main.py`, você cria seu flow e conecta as crews. É possível defin
|
||||
Veja um exemplo de como conectar a `poem_crew` no arquivo `main.py`:
|
||||
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
#!/usr/bin/env python
|
||||
from random import randint
|
||||
|
||||
from pydantic import BaseModel
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from .crews.poem_crew.poem_crew import PoemCrew
|
||||
|
||||
class PoemState(BaseModel):
|
||||
sentence_count: int = 1
|
||||
poem: str = ""
|
||||
|
||||
class PoemFlow(Flow[PoemState]):
|
||||
|
||||
@start()
|
||||
def generate_sentence_count(self):
|
||||
print("Generating sentence count")
|
||||
self.state.sentence_count = randint(1, 5)
|
||||
|
||||
@listen(generate_sentence_count)
|
||||
def generate_poem(self):
|
||||
print("Generating poem")
|
||||
result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})
|
||||
|
||||
print("Poem generated", result.raw)
|
||||
self.state.poem = result.raw
|
||||
|
||||
@listen(generate_poem)
|
||||
def save_poem(self):
|
||||
print("Saving poem")
|
||||
with open("poem.txt", "w") as f:
|
||||
f.write(self.state.poem)
|
||||
|
||||
def kickoff():
|
||||
poem_flow = PoemFlow()
|
||||
poem_flow.kickoff()
|
||||
|
||||
|
||||
def plot():
|
||||
poem_flow = PoemFlow()
|
||||
poem_flow.plot("PoemFlowPlot")
|
||||
|
||||
if __name__ == "__main__":
|
||||
kickoff()
|
||||
plot()
|
||||
```
|
||||
|
||||
Neste exemplo, a classe `PoemFlow` define um fluxo que gera a quantidade de frases, usa a `PoemCrew` para gerar um poema e, depois, salva o poema em um arquivo. O flow inicia com o método `kickoff()`, e o gráfico é gerado pelo método `plot()`.
|
||||
@@ -515,7 +911,8 @@ O CrewAI oferece duas formas práticas de gerar plots dos seus flows:
|
||||
Se estiver trabalhando diretamente com uma instância do flow, basta chamar o método `plot()` do objeto. Isso criará um arquivo HTML com o plot interativo do seu flow.
|
||||
|
||||
```python Code
|
||||
# (O código não é traduzido)
|
||||
# Considerando que você já tem uma instância do flow
|
||||
flow.plot("my_flow_plot")
|
||||
```
|
||||
|
||||
Esse comando gera um arquivo chamado `my_flow_plot.html` no diretório atual. Abra esse arquivo em um navegador para visualizar o plot interativo.
|
||||
|
||||
@@ -63,7 +63,60 @@ Com estado não estruturado:
|
||||
Veja um exemplo simples de gerenciamento de estado não estruturado:
|
||||
|
||||
```python
|
||||
# código não traduzido
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
class UnstructuredStateFlow(Flow):
|
||||
@start()
|
||||
def initialize_data(self):
|
||||
print("Initializing flow data")
|
||||
# Adiciona pares chave-valor ao estado
|
||||
self.state["user_name"] = "Alex"
|
||||
self.state["preferences"] = {
|
||||
"theme": "dark",
|
||||
"language": "English"
|
||||
}
|
||||
self.state["items"] = []
|
||||
|
||||
# O estado do flow recebe automaticamente um ID único
|
||||
print(f"Flow ID: {self.state['id']}")
|
||||
|
||||
return "Initialized"
|
||||
|
||||
@listen(initialize_data)
|
||||
def process_data(self, previous_result):
|
||||
print(f"Previous step returned: {previous_result}")
|
||||
|
||||
# Acessa e modifica o estado
|
||||
user = self.state["user_name"]
|
||||
print(f"Processing data for {user}")
|
||||
|
||||
# Adiciona itens a uma lista no estado
|
||||
self.state["items"].append("item1")
|
||||
self.state["items"].append("item2")
|
||||
|
||||
# Adiciona um novo par chave-valor
|
||||
self.state["processed"] = True
|
||||
|
||||
return "Processed"
|
||||
|
||||
@listen(process_data)
|
||||
def generate_summary(self, previous_result):
|
||||
# Acessa múltiplos valores do estado
|
||||
user = self.state["user_name"]
|
||||
theme = self.state["preferences"]["theme"]
|
||||
items = self.state["items"]
|
||||
processed = self.state.get("processed", False)
|
||||
|
||||
summary = f"User {user} has {len(items)} items with {theme} theme. "
|
||||
summary += "Data is processed." if processed else "Data is not processed."
|
||||
|
||||
return summary
|
||||
|
||||
# Executa o flow
|
||||
flow = UnstructuredStateFlow()
|
||||
result = flow.kickoff()
|
||||
print(f"Final result: {result}")
|
||||
print(f"Final state: {flow.state}")
|
||||
```
|
||||
|
||||
### Quando Usar Estado Não Estruturado
|
||||
@@ -94,7 +147,63 @@ Ao utilizar estado estruturado:
|
||||
Veja como implementar o gerenciamento de estado estruturado:
|
||||
|
||||
```python
|
||||
# código não traduzido
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import List, Dict, Optional
|
||||
|
||||
# Define o modelo de estado
|
||||
class UserPreferences(BaseModel):
|
||||
theme: str = "light"
|
||||
language: str = "English"
|
||||
|
||||
class AppState(BaseModel):
|
||||
user_name: str = ""
|
||||
preferences: UserPreferences = UserPreferences()
|
||||
items: List[str] = []
|
||||
processed: bool = False
|
||||
completion_percentage: float = 0.0
|
||||
|
||||
# Cria um flow com estado tipado
|
||||
class StructuredStateFlow(Flow[AppState]):
|
||||
@start()
|
||||
def initialize_data(self):
|
||||
print("Initializing flow data")
|
||||
# Define valores do estado (com checagem de tipo)
|
||||
self.state.user_name = "Taylor"
|
||||
self.state.preferences.theme = "dark"
|
||||
|
||||
# O campo ID está disponível automaticamente
|
||||
print(f"Flow ID: {self.state.id}")
|
||||
|
||||
return "Initialized"
|
||||
|
||||
@listen(initialize_data)
|
||||
def process_data(self, previous_result):
|
||||
print(f"Processing data for {self.state.user_name}")
|
||||
|
||||
# Modifica o estado (com checagem de tipo)
|
||||
self.state.items.append("item1")
|
||||
self.state.items.append("item2")
|
||||
self.state.processed = True
|
||||
self.state.completion_percentage = 50.0
|
||||
|
||||
return "Processed"
|
||||
|
||||
@listen(process_data)
|
||||
def generate_summary(self, previous_result):
|
||||
# Acessa o estado (com autocompletar)
|
||||
summary = f"User {self.state.user_name} has {len(self.state.items)} items "
|
||||
summary += f"with {self.state.preferences.theme} theme. "
|
||||
summary += "Data is processed." if self.state.processed else "Data is not processed."
|
||||
summary += f" Completion: {self.state.completion_percentage}%"
|
||||
|
||||
return summary
|
||||
|
||||
# Executa o flow
|
||||
flow = StructuredStateFlow()
|
||||
result = flow.kickoff()
|
||||
print(f"Final result: {result}")
|
||||
print(f"Final state: {flow.state}")
|
||||
```
|
||||
|
||||
### Benefícios do Estado Estruturado
|
||||
@@ -138,7 +247,29 @@ Independente de você usar estado estruturado ou não estruturado, é possível
|
||||
Métodos do flow podem retornar valores que serão passados como argumento para métodos listeners:
|
||||
|
||||
```python
|
||||
# código não traduzido
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
class DataPassingFlow(Flow):
|
||||
@start()
|
||||
def generate_data(self):
|
||||
# Este valor de retorno será passado para os métodos listeners
|
||||
return "Generated data"
|
||||
|
||||
@listen(generate_data)
|
||||
def process_data(self, data_from_previous_step):
|
||||
print(f"Received: {data_from_previous_step}")
|
||||
# Você pode modificar os dados e repassá-los adiante
|
||||
processed_data = f"{data_from_previous_step} - processed"
|
||||
# Também atualiza o estado
|
||||
self.state["last_processed"] = processed_data
|
||||
return processed_data
|
||||
|
||||
@listen(process_data)
|
||||
def finalize_data(self, processed_data):
|
||||
print(f"Received processed data: {processed_data}")
|
||||
# Acessa tanto os dados passados quanto o estado
|
||||
last_processed = self.state.get("last_processed", "")
|
||||
return f"Final: {processed_data} (from state: {last_processed})"
|
||||
```
|
||||
|
||||
Esse padrão permite combinar passagem de dados direta com atualizações de estado para obter máxima flexibilidade.
|
||||
@@ -156,7 +287,36 @@ O decorador `@persist()` automatiza a persistência de estado, salvando o estado
|
||||
Ao aplicar em nível de classe, `@persist()` salva o estado após cada execução de método:
|
||||
|
||||
```python
|
||||
# código não traduzido
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from crewai.flow.persistence import persist
|
||||
from pydantic import BaseModel
|
||||
|
||||
class CounterState(BaseModel):
|
||||
value: int = 0
|
||||
|
||||
@persist() # Aplica à classe inteira do flow
|
||||
class PersistentCounterFlow(Flow[CounterState]):
|
||||
@start()
|
||||
def increment(self):
|
||||
self.state.value += 1
|
||||
print(f"Incremented to {self.state.value}")
|
||||
return self.state.value
|
||||
|
||||
@listen(increment)
|
||||
def double(self, value):
|
||||
self.state.value = value * 2
|
||||
print(f"Doubled to {self.state.value}")
|
||||
return self.state.value
|
||||
|
||||
# Primeira execução
|
||||
flow1 = PersistentCounterFlow()
|
||||
result1 = flow1.kickoff()
|
||||
print(f"First run result: {result1}")
|
||||
|
||||
# Segunda execução - passa o ID para carregar o estado persistido
|
||||
flow2 = PersistentCounterFlow()
|
||||
result2 = flow2.kickoff(inputs={"id": flow1.state.id})
|
||||
print(f"Second run result: {result2}") # Será maior devido ao estado persistido
|
||||
```
|
||||
|
||||
#### Persistência em Nível de Método
|
||||
@@ -164,7 +324,26 @@ Ao aplicar em nível de classe, `@persist()` salva o estado após cada execuçã
|
||||
Para mais controle, você pode aplicar `@persist()` em métodos específicos:
|
||||
|
||||
```python
|
||||
# código não traduzido
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from crewai.flow.persistence import persist
|
||||
|
||||
class SelectivePersistFlow(Flow):
|
||||
@start()
|
||||
def first_step(self):
|
||||
self.state["count"] = 1
|
||||
return "First step"
|
||||
|
||||
@persist() # Persiste apenas após este método
|
||||
@listen(first_step)
|
||||
def important_step(self, prev_result):
|
||||
self.state["count"] += 1
|
||||
self.state["important_data"] = "This will be persisted"
|
||||
return "Important step completed"
|
||||
|
||||
@listen(important_step)
|
||||
def final_step(self, prev_result):
|
||||
self.state["count"] += 1
|
||||
return f"Complete with count {self.state['count']}"
|
||||
```
|
||||
|
||||
#### Forking de Estado Persistido
|
||||
@@ -216,7 +395,45 @@ Notas sobre o comportamento:
|
||||
Você pode usar o estado para implementar lógicas condicionais complexas em seus flows:
|
||||
|
||||
```python
|
||||
# código não traduzido
|
||||
from crewai.flow.flow import Flow, listen, router, start
|
||||
from pydantic import BaseModel
|
||||
|
||||
class PaymentState(BaseModel):
|
||||
amount: float = 0.0
|
||||
is_approved: bool = False
|
||||
retry_count: int = 0
|
||||
|
||||
class PaymentFlow(Flow[PaymentState]):
|
||||
@start()
|
||||
def process_payment(self):
|
||||
# Simula o processamento do pagamento
|
||||
self.state.amount = 100.0
|
||||
self.state.is_approved = self.state.amount < 1000
|
||||
return "Payment processed"
|
||||
|
||||
@router(process_payment)
|
||||
def check_approval(self, previous_result):
|
||||
if self.state.is_approved:
|
||||
return "approved"
|
||||
elif self.state.retry_count < 3:
|
||||
return "retry"
|
||||
else:
|
||||
return "rejected"
|
||||
|
||||
@listen("approved")
|
||||
def handle_approval(self):
|
||||
return f"Payment of ${self.state.amount} approved!"
|
||||
|
||||
@listen("retry")
|
||||
def handle_retry(self):
|
||||
self.state.retry_count += 1
|
||||
print(f"Retrying payment (attempt {self.state.retry_count})...")
|
||||
# Aqui poderia ser implementada a lógica de retry
|
||||
return "Retry initiated"
|
||||
|
||||
@listen("rejected")
|
||||
def handle_rejection(self):
|
||||
return f"Payment of ${self.state.amount} rejected after {self.state.retry_count} retries."
|
||||
```
|
||||
|
||||
### Manipulações Complexas de Estado
|
||||
@@ -224,7 +441,60 @@ Você pode usar o estado para implementar lógicas condicionais complexas em seu
|
||||
Para transformar estados complexos, você pode criar métodos dedicados:
|
||||
|
||||
```python
|
||||
# código não traduzido
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Dict
|
||||
|
||||
class UserData(BaseModel):
|
||||
name: str
|
||||
active: bool = True
|
||||
login_count: int = 0
|
||||
|
||||
class ComplexState(BaseModel):
|
||||
users: Dict[str, UserData] = {}
|
||||
active_user_count: int = 0
|
||||
|
||||
class TransformationFlow(Flow[ComplexState]):
|
||||
@start()
|
||||
def initialize(self):
|
||||
# Adiciona alguns usuários
|
||||
self.add_user("alice", "Alice")
|
||||
self.add_user("bob", "Bob")
|
||||
self.add_user("charlie", "Charlie")
|
||||
return "Initialized"
|
||||
|
||||
@listen(initialize)
|
||||
def process_users(self, _):
|
||||
# Incrementa contagens de login
|
||||
for user_id in self.state.users:
|
||||
self.increment_login(user_id)
|
||||
|
||||
# Desativa um usuário
|
||||
self.deactivate_user("bob")
|
||||
|
||||
# Atualiza a contagem de ativos
|
||||
self.update_active_count()
|
||||
|
||||
return f"Processed {len(self.state.users)} users"
|
||||
|
||||
# Métodos auxiliares para transformações de estado
|
||||
def add_user(self, user_id: str, name: str):
|
||||
self.state.users[user_id] = UserData(name=name)
|
||||
self.update_active_count()
|
||||
|
||||
def increment_login(self, user_id: str):
|
||||
if user_id in self.state.users:
|
||||
self.state.users[user_id].login_count += 1
|
||||
|
||||
def deactivate_user(self, user_id: str):
|
||||
if user_id in self.state.users:
|
||||
self.state.users[user_id].active = False
|
||||
self.update_active_count()
|
||||
|
||||
def update_active_count(self):
|
||||
self.state.active_user_count = sum(
|
||||
1 for user in self.state.users.values() if user.active
|
||||
)
|
||||
```
|
||||
|
||||
Esse padrão de criar métodos auxiliares mantém seus métodos de flow limpos, enquanto permite manipulações complexas de estado.
|
||||
@@ -238,7 +508,71 @@ Um dos padrões mais poderosos na CrewAI é combinar o gerenciamento de estado d
|
||||
Você pode usar o estado do flow para parametrizar crews:
|
||||
|
||||
```python
|
||||
# código não traduzido
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from crewai import Agent, Crew, Process, Task
|
||||
from pydantic import BaseModel
|
||||
|
||||
class ResearchState(BaseModel):
|
||||
topic: str = ""
|
||||
depth: str = "medium"
|
||||
results: str = ""
|
||||
|
||||
class ResearchFlow(Flow[ResearchState]):
|
||||
@start()
|
||||
def get_parameters(self):
|
||||
# Em uma aplicação real, isso pode vir da entrada do usuário
|
||||
self.state.topic = "Artificial Intelligence Ethics"
|
||||
self.state.depth = "deep"
|
||||
return "Parameters set"
|
||||
|
||||
@listen(get_parameters)
|
||||
def execute_research(self, _):
|
||||
# Cria os agentes
|
||||
researcher = Agent(
|
||||
role="Research Specialist",
|
||||
goal=f"Research {self.state.topic} in {self.state.depth} detail",
|
||||
backstory="You are an expert researcher with a talent for finding accurate information."
|
||||
)
|
||||
|
||||
writer = Agent(
|
||||
role="Content Writer",
|
||||
goal="Transform research into clear, engaging content",
|
||||
backstory="You excel at communicating complex ideas clearly and concisely."
|
||||
)
|
||||
|
||||
# Cria as tarefas
|
||||
research_task = Task(
|
||||
description=f"Research {self.state.topic} with {self.state.depth} analysis",
|
||||
expected_output="Comprehensive research notes in markdown format",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
writing_task = Task(
|
||||
description=f"Create a summary on {self.state.topic} based on the research",
|
||||
expected_output="Well-written article in markdown format",
|
||||
agent=writer,
|
||||
context=[research_task]
|
||||
)
|
||||
|
||||
# Cria e executa a crew
|
||||
research_crew = Crew(
|
||||
agents=[researcher, writer],
|
||||
tasks=[research_task, writing_task],
|
||||
process=Process.sequential,
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Executa a crew e armazena o resultado no estado
|
||||
result = research_crew.kickoff()
|
||||
self.state.results = result.raw
|
||||
|
||||
return "Research completed"
|
||||
|
||||
@listen(execute_research)
|
||||
def summarize_results(self, _):
|
||||
# Acessa os resultados armazenados
|
||||
result_length = len(self.state.results)
|
||||
return f"Research on {self.state.topic} completed with {result_length} characters of results."
|
||||
```
|
||||
|
||||
### Manipulando Saídas de Crews no Estado
|
||||
@@ -246,7 +580,21 @@ Você pode usar o estado do flow para parametrizar crews:
|
||||
Quando um crew finaliza, é possível processar sua saída e armazená-la no estado do flow:
|
||||
|
||||
```python
|
||||
# código não traduzido
|
||||
@listen(execute_crew)
|
||||
def process_crew_results(self, _):
|
||||
# Faz parsing dos resultados brutos (assumindo saída em JSON)
|
||||
import json
|
||||
try:
|
||||
results_dict = json.loads(self.state.raw_results)
|
||||
self.state.processed_results = {
|
||||
"title": results_dict.get("title", ""),
|
||||
"main_points": results_dict.get("main_points", []),
|
||||
"conclusion": results_dict.get("conclusion", "")
|
||||
}
|
||||
return "Results processed successfully"
|
||||
except json.JSONDecodeError:
|
||||
self.state.error = "Failed to parse crew results as JSON"
|
||||
return "Error processing results"
|
||||
```
|
||||
|
||||
## Boas Práticas para Gerenciamento de Estado
|
||||
@@ -256,7 +604,19 @@ Quando um crew finaliza, é possível processar sua saída e armazená-la no est
|
||||
Projete seu estado para conter somente o necessário:
|
||||
|
||||
```python
|
||||
# Exemplo não traduzido
|
||||
# Abrangente demais
|
||||
class BloatedState(BaseModel):
|
||||
user_data: Dict = {}
|
||||
system_settings: Dict = {}
|
||||
temporary_calculations: List = []
|
||||
debug_info: Dict = {}
|
||||
# ...muitos outros campos
|
||||
|
||||
# Melhor: estado focado
|
||||
class FocusedState(BaseModel):
|
||||
user_id: str
|
||||
preferences: Dict[str, str]
|
||||
completion_status: Dict[str, bool]
|
||||
```
|
||||
|
||||
### 2. Use Estado Estruturado em Flows Complexos
|
||||
@@ -264,7 +624,23 @@ Projete seu estado para conter somente o necessário:
|
||||
À medida que seus flows evoluem em complexidade, o estado estruturado se torna cada vez mais valioso:
|
||||
|
||||
```python
|
||||
# Exemplo não traduzido
|
||||
# Flow simples pode usar estado não estruturado
|
||||
class SimpleGreetingFlow(Flow):
|
||||
@start()
|
||||
def greet(self):
|
||||
self.state["name"] = "World"
|
||||
return f"Hello, {self.state['name']}!"
|
||||
|
||||
# Flow complexo se beneficia de estado estruturado
|
||||
class UserRegistrationState(BaseModel):
|
||||
username: str
|
||||
email: str
|
||||
verification_status: bool = False
|
||||
registration_date: datetime = Field(default_factory=datetime.now)
|
||||
last_login: Optional[datetime] = None
|
||||
|
||||
class RegistrationFlow(Flow[UserRegistrationState]):
|
||||
# Métodos com acesso ao estado fortemente tipado
|
||||
```
|
||||
|
||||
### 3. Documente Transições de Estado
|
||||
@@ -272,7 +648,18 @@ Projete seu estado para conter somente o necessário:
|
||||
Para flows complexos, documente como o estado muda ao longo da execução:
|
||||
|
||||
```python
|
||||
# Exemplo não traduzido
|
||||
@start()
|
||||
def initialize_order(self):
|
||||
"""
|
||||
Initialize order state with empty values.
|
||||
|
||||
State before: {}
|
||||
State after: {order_id: str, items: [], status: 'new'}
|
||||
"""
|
||||
self.state.order_id = str(uuid.uuid4())
|
||||
self.state.items = []
|
||||
self.state.status = "new"
|
||||
return "Order initialized"
|
||||
```
|
||||
|
||||
### 4. Trate Erros de Estado de Forma Elegante
|
||||
@@ -280,7 +667,18 @@ Para flows complexos, documente como o estado muda ao longo da execução:
|
||||
Implemente tratamento de erros ao acessar o estado:
|
||||
|
||||
```python
|
||||
# Exemplo não traduzido
|
||||
@listen(previous_step)
|
||||
def process_data(self, _):
|
||||
try:
|
||||
# Tenta acessar um valor que pode não existir
|
||||
user_preference = self.state.preferences.get("theme", "default")
|
||||
except (AttributeError, KeyError):
|
||||
# Trata o erro de forma elegante
|
||||
self.state.errors = self.state.get("errors", [])
|
||||
self.state.errors.append("Failed to access preferences")
|
||||
user_preference = "default"
|
||||
|
||||
return f"Used preference: {user_preference}"
|
||||
```
|
||||
|
||||
### 5. Use o Estado Para Acompanhar o Progresso
|
||||
@@ -288,7 +686,30 @@ Implemente tratamento de erros ao acessar o estado:
|
||||
Aproveite o estado para monitorar o progresso em flows de longa duração:
|
||||
|
||||
```python
|
||||
# Exemplo não traduzido
|
||||
class ProgressTrackingFlow(Flow):
|
||||
@start()
|
||||
def initialize(self):
|
||||
self.state["total_steps"] = 3
|
||||
self.state["current_step"] = 0
|
||||
self.state["progress"] = 0.0
|
||||
self.update_progress()
|
||||
return "Initialized"
|
||||
|
||||
def update_progress(self):
|
||||
"""Helper method to calculate and update progress"""
|
||||
if self.state.get("total_steps", 0) > 0:
|
||||
self.state["progress"] = (self.state.get("current_step", 0) /
|
||||
self.state["total_steps"]) * 100
|
||||
print(f"Progress: {self.state['progress']:.1f}%")
|
||||
|
||||
@listen(initialize)
|
||||
def step_one(self, _):
|
||||
# Realiza o trabalho...
|
||||
self.state["current_step"] = 1
|
||||
self.update_progress()
|
||||
return "Step 1 complete"
|
||||
|
||||
# Etapas adicionais...
|
||||
```
|
||||
|
||||
### 6. Prefira Operações Imutáveis Quando Possível
|
||||
@@ -296,7 +717,22 @@ Aproveite o estado para monitorar o progresso em flows de longa duração:
|
||||
Especialmente com estado estruturado, prefira operações imutáveis para maior clareza:
|
||||
|
||||
```python
|
||||
# Exemplo não traduzido
|
||||
# Em vez de modificar listas no local:
|
||||
self.state.items.append(new_item) # Operação mutável
|
||||
|
||||
# Considere criar um novo estado:
|
||||
from pydantic import BaseModel
|
||||
from typing import List
|
||||
|
||||
class ItemState(BaseModel):
|
||||
items: List[str] = []
|
||||
|
||||
class ImmutableFlow(Flow[ItemState]):
|
||||
@start()
|
||||
def add_item(self):
|
||||
# Cria uma nova lista com o item adicionado
|
||||
self.state.items = [*self.state.items, "new item"]
|
||||
return "Item added"
|
||||
```
|
||||
|
||||
## Depurando o Estado do Flow
|
||||
@@ -306,7 +742,24 @@ Especialmente com estado estruturado, prefira operações imutáveis para maior
|
||||
Ao desenvolver, adicione logs para acompanhar mudanças no estado:
|
||||
|
||||
```python
|
||||
# Exemplo não traduzido
|
||||
import logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
class LoggingFlow(Flow):
|
||||
def log_state(self, step_name):
|
||||
logging.info(f"State after {step_name}: {self.state}")
|
||||
|
||||
@start()
|
||||
def initialize(self):
|
||||
self.state["counter"] = 0
|
||||
self.log_state("initialize")
|
||||
return "Initialized"
|
||||
|
||||
@listen(initialize)
|
||||
def increment(self, _):
|
||||
self.state["counter"] += 1
|
||||
self.log_state("increment")
|
||||
return f"Incremented to {self.state['counter']}"
|
||||
```
|
||||
|
||||
### Visualizando o Estado
|
||||
@@ -314,7 +767,30 @@ Ao desenvolver, adicione logs para acompanhar mudanças no estado:
|
||||
Você pode adicionar métodos para visualizar seu estado durante o debug:
|
||||
|
||||
```python
|
||||
# Exemplo não traduzido
|
||||
def visualize_state(self):
|
||||
"""Create a simple visualization of the current state"""
|
||||
import json
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
|
||||
console = Console()
|
||||
|
||||
if hasattr(self.state, "model_dump"):
|
||||
# Pydantic v2
|
||||
state_dict = self.state.model_dump()
|
||||
elif hasattr(self.state, "dict"):
|
||||
# Pydantic v1
|
||||
state_dict = self.state.dict()
|
||||
else:
|
||||
# Estado não estruturado
|
||||
state_dict = dict(self.state)
|
||||
|
||||
# Remove o id para uma saída mais limpa
|
||||
if "id" in state_dict:
|
||||
state_dict.pop("id")
|
||||
|
||||
state_json = json.dumps(state_dict, indent=2, default=str)
|
||||
console.print(Panel(state_json, title="Current Flow State"))
|
||||
```
|
||||
|
||||
## Conclusão
|
||||
|
||||
@@ -1,20 +1,23 @@
|
||||
"""String templates for A2A (Agent-to-Agent) delegation prompts."""
|
||||
"""String templates for A2A (Agent-to-Agent) protocol messaging and status."""
|
||||
|
||||
from string import Template
|
||||
from typing import Final
|
||||
|
||||
|
||||
AVAILABLE_AGENTS_TEMPLATE: Final[Template] = Template(
|
||||
"\n<AVAILABLE_A2A_AGENTS>\n"
|
||||
"You can delegate to remote agents using the delegate_to_* tools below. "
|
||||
"Each tool's description lists the remote agent's capabilities — call the "
|
||||
"tool whose capabilities best match the task. Pass the question or sub-task "
|
||||
"to the remote agent via the tool's `message` argument; the tool returns "
|
||||
"the remote agent's response, which you should incorporate into your final "
|
||||
"answer. If the available agents are not a good fit, answer directly "
|
||||
"without calling a delegation tool.\n\n"
|
||||
" $available_a2a_agents"
|
||||
"\n</AVAILABLE_A2A_AGENTS>\n"
|
||||
"\n<AVAILABLE_A2A_AGENTS>\n $available_a2a_agents\n</AVAILABLE_A2A_AGENTS>\n"
|
||||
)
|
||||
PREVIOUS_A2A_CONVERSATION_TEMPLATE: Final[Template] = Template(
|
||||
"\n<PREVIOUS_A2A_CONVERSATION>\n"
|
||||
" $previous_a2a_conversation"
|
||||
"\n</PREVIOUS_A2A_CONVERSATION>\n"
|
||||
)
|
||||
CONVERSATION_TURN_INFO_TEMPLATE: Final[Template] = Template(
|
||||
"\n<CONVERSATION_PROGRESS>\n"
|
||||
' turn="$turn_count"\n'
|
||||
' max_turns="$max_turns"\n'
|
||||
" $warning"
|
||||
"\n</CONVERSATION_PROGRESS>\n"
|
||||
)
|
||||
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
|
||||
"\n<A2A_AGENTS_STATUS>\n"
|
||||
@@ -24,3 +27,29 @@ UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
|
||||
" $unavailable_agents"
|
||||
"\n</A2A_AGENTS_STATUS>\n"
|
||||
)
|
||||
REMOTE_AGENT_COMPLETED_NOTICE: Final[str] = """
|
||||
<REMOTE_AGENT_STATUS>
|
||||
STATUS: COMPLETED
|
||||
The remote agent has finished processing your request. Their response is in the conversation history above.
|
||||
You MUST now:
|
||||
1. Extract the answer from the conversation history
|
||||
2. Set is_a2a=false
|
||||
3. Return the answer as your final message
|
||||
DO NOT send another request - the task is already done.
|
||||
</REMOTE_AGENT_STATUS>
|
||||
"""
|
||||
|
||||
REMOTE_AGENT_RESPONSE_NOTICE: Final[str] = """
|
||||
<REMOTE_AGENT_STATUS>
|
||||
STATUS: RESPONSE_RECEIVED
|
||||
The remote agent has responded. Their response is in the conversation history above.
|
||||
|
||||
You MUST now:
|
||||
1. Set is_a2a=false (the remote task is complete and cannot receive more messages)
|
||||
2. Provide YOUR OWN response to the original task based on the information received
|
||||
|
||||
IMPORTANT: Your response should be addressed to the USER who gave you the original task.
|
||||
Report what the remote agent told you in THIRD PERSON (e.g., "The remote agent said..." or "I learned that...").
|
||||
Do NOT address the remote agent directly or use "you" to refer to them.
|
||||
</REMOTE_AGENT_STATUS>
|
||||
"""
|
||||
|
||||
@@ -1,394 +0,0 @@
|
||||
"""Tool-based A2A delegation.
|
||||
|
||||
Each remote A2A agent is exposed to the local LLM as a BaseTool. The local
|
||||
agent's normal tool-call loop drives multi-turn delegation: each tool call is
|
||||
one turn against the remote agent. Per-endpoint conversation state lives in
|
||||
``A2ADelegationState`` and is shared across the tools built for a single task.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from a2a.types import Role, TaskState
|
||||
from pydantic import BaseModel, Field, PrivateAttr
|
||||
|
||||
from crewai.a2a.config import A2AClientConfig, A2AConfig
|
||||
from crewai.a2a.extensions.base import (
|
||||
A2AExtension,
|
||||
ConversationState,
|
||||
ExtensionRegistry,
|
||||
)
|
||||
from crewai.a2a.task_helpers import TaskStateResult
|
||||
from crewai.a2a.utils.delegation import aexecute_a2a_delegation, execute_a2a_delegation
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.a2a_events import A2AConversationCompletedEvent
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.utilities.string_utils import sanitize_tool_name
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from a2a.types import AgentCard, Message
|
||||
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
_DELEGATE_PREFIX = "delegate_to_"
|
||||
|
||||
|
||||
@dataclass
|
||||
class _EndpointState:
|
||||
"""Mutable per-endpoint conversation state across tool calls."""
|
||||
|
||||
conversation_history: list[Message] = field(default_factory=list)
|
||||
context_id: str | None = None
|
||||
task_id: str | None = None
|
||||
reference_task_ids: list[str] = field(default_factory=list)
|
||||
turn_count: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class A2ADelegationState:
|
||||
"""State shared across all A2A delegation tools for a single task execution."""
|
||||
|
||||
agent: Any
|
||||
task: Task
|
||||
extension_registry: ExtensionRegistry | None = None
|
||||
_per_endpoint: dict[str, _EndpointState] = field(default_factory=dict)
|
||||
|
||||
def _state_for(self, endpoint: str) -> _EndpointState:
|
||||
return self._per_endpoint.setdefault(endpoint, _EndpointState())
|
||||
|
||||
def _initial_ids_from_task(self, state: _EndpointState) -> None:
|
||||
if state.turn_count > 0:
|
||||
return
|
||||
task_config = self.task.config or {}
|
||||
if state.context_id is None:
|
||||
state.context_id = task_config.get("context_id")
|
||||
if state.task_id is None:
|
||||
state.task_id = task_config.get("task_id")
|
||||
if not state.reference_task_ids:
|
||||
state.reference_task_ids = list(task_config.get("reference_task_ids", []))
|
||||
|
||||
def delegate(
|
||||
self,
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
agent_card: AgentCard | None,
|
||||
message: str,
|
||||
) -> str:
|
||||
"""Run one delegation turn against ``config.endpoint``.
|
||||
|
||||
Returns the remote agent's response text, suitable for handing back to
|
||||
the local LLM as a tool result.
|
||||
"""
|
||||
return _run_delegation(self, config, agent_card, message, sync=True)
|
||||
|
||||
async def adelegate(
|
||||
self,
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
agent_card: AgentCard | None,
|
||||
message: str,
|
||||
) -> str:
|
||||
"""Async variant of :meth:`delegate`."""
|
||||
return await _run_delegation_async(self, config, agent_card, message)
|
||||
|
||||
|
||||
class _A2ADelegationArgs(BaseModel):
|
||||
"""Argument schema for A2A delegation tools."""
|
||||
|
||||
message: str = Field(
|
||||
...,
|
||||
description=(
|
||||
"The question or task to send to the remote agent. Be specific and "
|
||||
"self-contained: the remote agent does not see your other tools or "
|
||||
"your prior reasoning."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class A2ADelegationTool(BaseTool):
|
||||
"""BaseTool that delegates one turn of conversation to a remote A2A agent.
|
||||
|
||||
Each instance is bound to a specific A2A endpoint via ``_config``. Calling
|
||||
``_run`` or ``_arun`` advances that endpoint's conversation by one turn and
|
||||
returns the remote agent's response text.
|
||||
"""
|
||||
|
||||
args_schema: type[BaseModel] = _A2ADelegationArgs
|
||||
|
||||
_config: A2AConfig | A2AClientConfig = PrivateAttr()
|
||||
_agent_card: AgentCard | None = PrivateAttr(default=None)
|
||||
_state: A2ADelegationState = PrivateAttr()
|
||||
|
||||
def _run(self, message: str) -> str:
|
||||
return self._state.delegate(self._config, self._agent_card, message)
|
||||
|
||||
async def _arun(self, message: str) -> str:
|
||||
return await self._state.adelegate(self._config, self._agent_card, message)
|
||||
|
||||
|
||||
def build_a2a_tools(
|
||||
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||
agent_cards: dict[str, AgentCard],
|
||||
state: A2ADelegationState,
|
||||
) -> list[BaseTool]:
|
||||
"""Build one ``A2ADelegationTool`` per available A2A agent.
|
||||
|
||||
Tool names collide-disambiguate with a numeric suffix; agents whose cards
|
||||
failed to fetch are skipped.
|
||||
"""
|
||||
tools: list[BaseTool] = []
|
||||
used_names: set[str] = set()
|
||||
for config in a2a_agents:
|
||||
card = agent_cards.get(config.endpoint)
|
||||
if card is None:
|
||||
continue
|
||||
name = _build_tool_name(card.name or "remote_agent", used_names)
|
||||
used_names.add(name)
|
||||
tool = A2ADelegationTool(
|
||||
name=name,
|
||||
description=_build_tool_description(card),
|
||||
max_usage_count=config.max_turns,
|
||||
)
|
||||
tool._config = config
|
||||
tool._agent_card = card
|
||||
tool._state = state
|
||||
tools.append(tool)
|
||||
return tools
|
||||
|
||||
|
||||
def _build_tool_name(card_name: str, used: set[str]) -> str:
|
||||
base = sanitize_tool_name(f"{_DELEGATE_PREFIX}{card_name}")
|
||||
if base not in used:
|
||||
return base
|
||||
for i in range(2, 1000):
|
||||
candidate = sanitize_tool_name(f"{base}_{i}")
|
||||
if candidate not in used:
|
||||
return candidate
|
||||
raise ValueError(f"Could not generate unique tool name for {card_name!r}")
|
||||
|
||||
|
||||
def _build_tool_description(card: AgentCard) -> str:
|
||||
lines: list[str] = [f"Delegate a task to the remote A2A agent {card.name!r}."]
|
||||
if card.description:
|
||||
lines.append(card.description.strip())
|
||||
if card.skills:
|
||||
skill_names = ", ".join(s.name for s in card.skills if s.name)
|
||||
if skill_names:
|
||||
lines.append(f"Capabilities: {skill_names}.")
|
||||
lines.append(
|
||||
"Use this tool only when the question matches the agent's capabilities. "
|
||||
"After receiving a response, prefer answering directly unless you need "
|
||||
"another round-trip."
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _run_delegation(
|
||||
state: A2ADelegationState,
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
agent_card: AgentCard | None,
|
||||
message: str,
|
||||
*,
|
||||
sync: bool,
|
||||
) -> str:
|
||||
endpoint_state = state._state_for(config.endpoint)
|
||||
state._initial_ids_from_task(endpoint_state)
|
||||
|
||||
extension_states = _extract_extension_states(state, endpoint_state)
|
||||
metadata = _merged_metadata(state, endpoint_state, extension_states)
|
||||
agent_branch, accepted_output_modes = _turn_context(config)
|
||||
|
||||
a2a_result = execute_a2a_delegation(
|
||||
endpoint=config.endpoint,
|
||||
auth=config.auth,
|
||||
timeout=config.timeout,
|
||||
task_description=message,
|
||||
context_id=endpoint_state.context_id,
|
||||
task_id=endpoint_state.task_id,
|
||||
reference_task_ids=endpoint_state.reference_task_ids,
|
||||
metadata=metadata or None,
|
||||
extensions=(state.task.config or {}).get("extensions"),
|
||||
conversation_history=endpoint_state.conversation_history,
|
||||
agent_id=config.endpoint,
|
||||
agent_role=Role.user,
|
||||
agent_branch=agent_branch,
|
||||
response_model=config.response_model,
|
||||
turn_number=endpoint_state.turn_count + 1,
|
||||
updates=config.updates,
|
||||
transport=config.transport,
|
||||
from_task=state.task,
|
||||
from_agent=state.agent,
|
||||
client_extensions=getattr(config, "extensions", None),
|
||||
accepted_output_modes=accepted_output_modes,
|
||||
input_files=state.task.input_files,
|
||||
)
|
||||
return _finalize_turn(
|
||||
state, endpoint_state, config, agent_card, a2a_result, extension_states
|
||||
)
|
||||
|
||||
|
||||
async def _run_delegation_async(
|
||||
state: A2ADelegationState,
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
agent_card: AgentCard | None,
|
||||
message: str,
|
||||
) -> str:
|
||||
endpoint_state = state._state_for(config.endpoint)
|
||||
state._initial_ids_from_task(endpoint_state)
|
||||
|
||||
extension_states = _extract_extension_states(state, endpoint_state)
|
||||
metadata = _merged_metadata(state, endpoint_state, extension_states)
|
||||
agent_branch, accepted_output_modes = _turn_context(config)
|
||||
|
||||
a2a_result = await aexecute_a2a_delegation(
|
||||
endpoint=config.endpoint,
|
||||
auth=config.auth,
|
||||
timeout=config.timeout,
|
||||
task_description=message,
|
||||
context_id=endpoint_state.context_id,
|
||||
task_id=endpoint_state.task_id,
|
||||
reference_task_ids=endpoint_state.reference_task_ids,
|
||||
metadata=metadata or None,
|
||||
extensions=(state.task.config or {}).get("extensions"),
|
||||
conversation_history=endpoint_state.conversation_history,
|
||||
agent_id=config.endpoint,
|
||||
agent_role=Role.user,
|
||||
agent_branch=agent_branch,
|
||||
response_model=config.response_model,
|
||||
turn_number=endpoint_state.turn_count + 1,
|
||||
updates=config.updates,
|
||||
transport=config.transport,
|
||||
from_task=state.task,
|
||||
from_agent=state.agent,
|
||||
client_extensions=getattr(config, "extensions", None),
|
||||
accepted_output_modes=accepted_output_modes,
|
||||
input_files=state.task.input_files,
|
||||
)
|
||||
return _finalize_turn(
|
||||
state, endpoint_state, config, agent_card, a2a_result, extension_states
|
||||
)
|
||||
|
||||
|
||||
def _extract_extension_states(
|
||||
state: A2ADelegationState,
|
||||
endpoint_state: _EndpointState,
|
||||
) -> dict[type[A2AExtension], ConversationState]:
|
||||
if state.extension_registry and endpoint_state.conversation_history:
|
||||
return state.extension_registry.extract_all_states(
|
||||
endpoint_state.conversation_history
|
||||
)
|
||||
return {}
|
||||
|
||||
|
||||
def _merged_metadata(
|
||||
state: A2ADelegationState,
|
||||
endpoint_state: _EndpointState,
|
||||
extension_states: dict[type[A2AExtension], ConversationState],
|
||||
) -> dict[str, Any]:
|
||||
task_config = state.task.config or {}
|
||||
metadata: dict[str, Any] = dict(task_config.get("metadata") or {})
|
||||
if state.extension_registry and extension_states:
|
||||
metadata.update(state.extension_registry.prepare_all_metadata(extension_states))
|
||||
return metadata
|
||||
|
||||
|
||||
def _turn_context(
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
) -> tuple[Any | None, list[str] | None]:
|
||||
console_formatter = getattr(crewai_event_bus, "_console", None)
|
||||
agent_branch = None
|
||||
if console_formatter:
|
||||
agent_branch = getattr(
|
||||
console_formatter, "current_agent_branch", None
|
||||
) or getattr(console_formatter, "current_task_branch", None)
|
||||
|
||||
accepted_output_modes = None
|
||||
if isinstance(config, A2AClientConfig):
|
||||
accepted_output_modes = config.accepted_output_modes
|
||||
return agent_branch, accepted_output_modes
|
||||
|
||||
|
||||
def _finalize_turn(
|
||||
state: A2ADelegationState,
|
||||
endpoint_state: _EndpointState,
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
agent_card: AgentCard | None,
|
||||
a2a_result: TaskStateResult,
|
||||
extension_states: dict[type[A2AExtension], ConversationState],
|
||||
) -> str:
|
||||
endpoint_state.conversation_history = list(a2a_result.get("history", []))
|
||||
if endpoint_state.conversation_history:
|
||||
latest = endpoint_state.conversation_history[-1]
|
||||
if latest.task_id is not None:
|
||||
endpoint_state.task_id = latest.task_id
|
||||
if latest.context_id is not None:
|
||||
endpoint_state.context_id = latest.context_id
|
||||
|
||||
endpoint_state.turn_count += 1
|
||||
status = a2a_result.get("status")
|
||||
|
||||
if status == TaskState.completed:
|
||||
if (
|
||||
endpoint_state.task_id is not None
|
||||
and endpoint_state.task_id not in endpoint_state.reference_task_ids
|
||||
):
|
||||
endpoint_state.reference_task_ids.append(endpoint_state.task_id)
|
||||
if state.task.config is None:
|
||||
state.task.config = {}
|
||||
state.task.config["reference_task_ids"] = list(
|
||||
endpoint_state.reference_task_ids
|
||||
)
|
||||
endpoint_state.task_id = None
|
||||
|
||||
result_text = str(a2a_result.get("result", ""))
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AConversationCompletedEvent(
|
||||
status="completed",
|
||||
final_result=result_text,
|
||||
error=None,
|
||||
total_turns=endpoint_state.turn_count,
|
||||
from_task=state.task,
|
||||
from_agent=state.agent,
|
||||
endpoint=config.endpoint,
|
||||
a2a_agent_name=agent_card.name if agent_card else None,
|
||||
agent_card=agent_card.model_dump() if agent_card else None,
|
||||
),
|
||||
)
|
||||
return _apply_response_extensions(state, result_text, extension_states)
|
||||
|
||||
if status == TaskState.input_required:
|
||||
result_text = str(a2a_result.get("result", ""))
|
||||
return _apply_response_extensions(state, result_text, extension_states)
|
||||
|
||||
error_msg = a2a_result.get("error", "Unknown error")
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AConversationCompletedEvent(
|
||||
status="failed",
|
||||
final_result=None,
|
||||
error=error_msg,
|
||||
total_turns=endpoint_state.turn_count,
|
||||
from_task=state.task,
|
||||
from_agent=state.agent,
|
||||
endpoint=config.endpoint,
|
||||
a2a_agent_name=agent_card.name if agent_card else None,
|
||||
agent_card=agent_card.model_dump() if agent_card else None,
|
||||
),
|
||||
)
|
||||
return f"Remote agent error: {error_msg}"
|
||||
|
||||
|
||||
def _apply_response_extensions(
|
||||
state: A2ADelegationState,
|
||||
response_text: str,
|
||||
extension_states: dict[type[A2AExtension], ConversationState],
|
||||
) -> str:
|
||||
if not state.extension_registry:
|
||||
return response_text
|
||||
processed = state.extension_registry.process_response_with_all(
|
||||
response_text, extension_states
|
||||
)
|
||||
return processed if isinstance(processed, str) else str(processed)
|
||||
@@ -6,6 +6,8 @@ from typing import (
|
||||
Annotated,
|
||||
Any,
|
||||
Literal,
|
||||
Protocol,
|
||||
runtime_checkable,
|
||||
)
|
||||
|
||||
from pydantic import BeforeValidator, HttpUrl, TypeAdapter
|
||||
@@ -55,6 +57,15 @@ Url = Annotated[
|
||||
]
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class AgentResponseProtocol(Protocol):
|
||||
"""Protocol for the dynamically created AgentResponse model."""
|
||||
|
||||
a2a_ids: tuple[str, ...]
|
||||
message: str
|
||||
is_a2a: bool
|
||||
|
||||
|
||||
class PartsMetadataDict(TypedDict, total=False):
|
||||
"""Metadata for A2A message parts.
|
||||
|
||||
|
||||
@@ -1,25 +1,75 @@
|
||||
"""Helpers for extracting A2A client configurations."""
|
||||
"""Response model utilities for A2A agent interactions."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TypeAlias
|
||||
|
||||
from pydantic import BaseModel, Field, create_model
|
||||
|
||||
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
|
||||
from crewai.types.utils import create_literals_from_strings
|
||||
|
||||
|
||||
A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig
|
||||
A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig
|
||||
|
||||
|
||||
def extract_a2a_client_configs(
|
||||
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
|
||||
) -> list[A2AClientConfigTypes]:
|
||||
"""Return the client-side A2A configs from a possibly-mixed config list.
|
||||
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel] | None:
|
||||
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
|
||||
|
||||
Filters out :class:`A2AServerConfig`, which has no endpoint to delegate to.
|
||||
Args:
|
||||
agent_ids: List of available A2A agent IDs.
|
||||
|
||||
Returns:
|
||||
Dynamically created Pydantic model with Literal-constrained a2a_ids field,
|
||||
or None if agent_ids is empty.
|
||||
"""
|
||||
if not agent_ids:
|
||||
return None
|
||||
|
||||
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
|
||||
|
||||
return create_model(
|
||||
"AgentResponse",
|
||||
a2a_ids=(
|
||||
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
|
||||
Field(
|
||||
default_factory=tuple,
|
||||
max_length=len(agent_ids),
|
||||
description="A2A agent IDs to delegate to.",
|
||||
),
|
||||
),
|
||||
message=(
|
||||
str,
|
||||
Field(
|
||||
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
|
||||
),
|
||||
),
|
||||
is_a2a=(
|
||||
bool,
|
||||
Field(
|
||||
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
|
||||
),
|
||||
),
|
||||
__base__=BaseModel,
|
||||
)
|
||||
|
||||
|
||||
def extract_a2a_agent_ids_from_config(
|
||||
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
|
||||
) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]:
|
||||
"""Extract A2A agent IDs from A2A configuration.
|
||||
|
||||
Filters out A2AServerConfig since it doesn't have an endpoint for delegation.
|
||||
|
||||
Args:
|
||||
a2a_config: A2A configuration (any type).
|
||||
|
||||
Returns:
|
||||
Tuple of client A2A configs list and agent endpoint IDs.
|
||||
"""
|
||||
if a2a_config is None:
|
||||
return []
|
||||
return [], ()
|
||||
|
||||
configs: list[A2AConfigTypes]
|
||||
if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)):
|
||||
@@ -27,6 +77,24 @@ def extract_a2a_client_configs(
|
||||
else:
|
||||
configs = a2a_config
|
||||
|
||||
return [
|
||||
client_configs: list[A2AClientConfigTypes] = [
|
||||
config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig))
|
||||
]
|
||||
|
||||
return client_configs, tuple(config.endpoint for config in client_configs)
|
||||
|
||||
|
||||
def get_a2a_agents_and_response_model(
|
||||
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
|
||||
) -> tuple[list[A2AClientConfigTypes], type[BaseModel] | None]:
|
||||
"""Get A2A agent configs and response model.
|
||||
|
||||
Args:
|
||||
a2a_config: A2A configuration (any type).
|
||||
|
||||
Returns:
|
||||
Tuple of client A2A configs and response model.
|
||||
"""
|
||||
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
|
||||
|
||||
return a2a_agents, create_agent_response_model(agent_ids)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -111,6 +111,12 @@ from crewai.utilities.token_counter_callback import TokenCalcHandler
|
||||
from crewai.utilities.training_handler import CrewTrainingHandler
|
||||
|
||||
|
||||
try:
|
||||
from crewai.a2a.types import AgentResponseProtocol
|
||||
except ImportError:
|
||||
AgentResponseProtocol = None # type: ignore[assignment, misc]
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai_files import FileInput
|
||||
|
||||
@@ -626,7 +632,15 @@ class Agent(BaseAgent):
|
||||
|
||||
result = process_tool_results(self, result)
|
||||
|
||||
output_for_event = result if isinstance(result, str) else str(result)
|
||||
output_for_event = result
|
||||
if (
|
||||
AgentResponseProtocol is not None
|
||||
and isinstance(result, BaseModel)
|
||||
and isinstance(result, AgentResponseProtocol)
|
||||
):
|
||||
output_for_event = str(result.message)
|
||||
elif not isinstance(result, str):
|
||||
output_for_event = str(result)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
|
||||
@@ -121,11 +121,11 @@ def _kickoff_with_a2a_support(
|
||||
Returns:
|
||||
LiteAgentOutput from either local execution or A2A delegation.
|
||||
"""
|
||||
from crewai.a2a.utils.response_model import extract_a2a_client_configs
|
||||
from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model
|
||||
from crewai.a2a.wrapper import _execute_task_with_a2a
|
||||
from crewai.task import Task
|
||||
|
||||
a2a_agents = extract_a2a_client_configs(agent.a2a)
|
||||
a2a_agents, agent_response_model = get_a2a_agents_and_response_model(agent.a2a)
|
||||
|
||||
if not a2a_agents:
|
||||
return original_kickoff(messages, response_format, input_files)
|
||||
@@ -160,6 +160,7 @@ def _kickoff_with_a2a_support(
|
||||
a2a_agents=a2a_agents,
|
||||
original_fn=task_to_kickoff_adapter,
|
||||
task=fake_task,
|
||||
agent_response_model=agent_response_model,
|
||||
context=None,
|
||||
tools=None,
|
||||
extension_registry=extension_registry,
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
"""Tests for A2A delegation tool behavior, including trust_remote_completion_status."""
|
||||
"""Test trust_remote_completion_status flag in A2A wrapper."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.a2a.config import A2AClientConfig, A2AConfig
|
||||
|
||||
from crewai.a2a.config import A2AConfig
|
||||
|
||||
try:
|
||||
from a2a.types import TaskState # noqa: F401
|
||||
from a2a.types import Message, Role
|
||||
|
||||
A2A_SDK_INSTALLED = True
|
||||
except ImportError:
|
||||
@@ -16,126 +15,141 @@ except ImportError:
|
||||
|
||||
|
||||
def _create_mock_agent_card(name: str = "Test", url: str = "http://test-endpoint.com/"):
|
||||
"""Create a mock agent card with the attributes A2ADelegationTool reads."""
|
||||
"""Create a mock agent card with proper model_dump behavior."""
|
||||
mock_card = MagicMock()
|
||||
mock_card.name = name
|
||||
mock_card.url = url
|
||||
mock_card.description = "A test agent"
|
||||
mock_card.skills = []
|
||||
mock_card.model_dump.return_value = {"name": name, "url": url}
|
||||
mock_card.model_dump_json.return_value = f'{{"name": "{name}", "url": "{url}"}}'
|
||||
return mock_card
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
|
||||
def test_delegation_tool_returns_remote_result_on_completion():
|
||||
"""A successful remote completion is returned to the local LLM as the tool result."""
|
||||
from a2a.types import TaskState
|
||||
|
||||
def test_trust_remote_completion_status_true_returns_directly():
|
||||
"""When trust_remote_completion_status=True and A2A returns completed, return result directly."""
|
||||
from crewai.a2a.wrapper import _delegate_to_a2a
|
||||
from crewai.a2a.types import AgentResponseProtocol
|
||||
from crewai import Agent, Task
|
||||
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
|
||||
|
||||
config = A2AClientConfig(endpoint="http://test-endpoint.com")
|
||||
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
|
||||
a2a_config = A2AConfig(
|
||||
endpoint="http://test-endpoint.com",
|
||||
trust_remote_completion_status=True,
|
||||
)
|
||||
|
||||
agent = Agent(
|
||||
role="test manager",
|
||||
goal="coordinate",
|
||||
backstory="test",
|
||||
a2a=a2a_config,
|
||||
)
|
||||
|
||||
task = Task(description="test", expected_output="test", agent=agent)
|
||||
|
||||
card = _create_mock_agent_card()
|
||||
state = A2ADelegationState(agent=agent, task=task)
|
||||
tools = build_a2a_tools([config], {config.endpoint: card}, state)
|
||||
assert len(tools) == 1
|
||||
tool = tools[0]
|
||||
class MockResponse:
|
||||
is_a2a = True
|
||||
message = "Please help"
|
||||
a2a_ids = ["http://test-endpoint.com/"]
|
||||
|
||||
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
|
||||
with (
|
||||
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
|
||||
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
|
||||
):
|
||||
mock_card = _create_mock_agent_card()
|
||||
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
|
||||
|
||||
# A2A returns completed
|
||||
mock_execute.return_value = {
|
||||
"status": TaskState.completed,
|
||||
"status": "completed",
|
||||
"result": "Done by remote",
|
||||
"history": [],
|
||||
}
|
||||
result = tool._run(message="Please help")
|
||||
|
||||
assert result == "Done by remote"
|
||||
assert mock_execute.call_count == 1
|
||||
# This should return directly without checking LLM response
|
||||
result = _delegate_to_a2a(
|
||||
self=agent,
|
||||
agent_response=MockResponse(),
|
||||
task=task,
|
||||
original_fn=lambda *args, **kwargs: "fallback",
|
||||
context=None,
|
||||
tools=None,
|
||||
agent_cards={"http://test-endpoint.com/": mock_card},
|
||||
original_task_description="test",
|
||||
)
|
||||
|
||||
assert result == "Done by remote"
|
||||
assert mock_execute.call_count == 1
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
|
||||
def test_delegation_tool_records_completed_task_in_references():
|
||||
"""When a remote task completes with a task_id, it goes into reference_task_ids."""
|
||||
from a2a.types import TaskState
|
||||
|
||||
def test_trust_remote_completion_status_false_continues_conversation():
|
||||
"""When trust_remote_completion_status=False and A2A returns completed, ask server agent."""
|
||||
from crewai.a2a.wrapper import _delegate_to_a2a
|
||||
from crewai import Agent, Task
|
||||
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
|
||||
|
||||
config = A2AClientConfig(endpoint="http://test-endpoint.com")
|
||||
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
|
||||
a2a_config = A2AConfig(
|
||||
endpoint="http://test-endpoint.com",
|
||||
trust_remote_completion_status=False,
|
||||
)
|
||||
|
||||
agent = Agent(
|
||||
role="test manager",
|
||||
goal="coordinate",
|
||||
backstory="test",
|
||||
a2a=a2a_config,
|
||||
)
|
||||
|
||||
task = Task(description="test", expected_output="test", agent=agent)
|
||||
|
||||
card = _create_mock_agent_card()
|
||||
state = A2ADelegationState(agent=agent, task=task)
|
||||
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
|
||||
class MockResponse:
|
||||
is_a2a = True
|
||||
message = "Please help"
|
||||
a2a_ids = ["http://test-endpoint.com/"]
|
||||
|
||||
history_msg = MagicMock()
|
||||
history_msg.task_id = "remote-task-1"
|
||||
history_msg.context_id = "ctx-1"
|
||||
call_count = 0
|
||||
|
||||
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
|
||||
def mock_original_fn(self, task, context, tools):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
# Server decides to finish
|
||||
return '{"is_a2a": false, "message": "Server final answer", "a2a_ids": []}'
|
||||
return "unexpected"
|
||||
|
||||
with (
|
||||
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
|
||||
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
|
||||
):
|
||||
mock_card = _create_mock_agent_card()
|
||||
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
|
||||
|
||||
# A2A returns completed
|
||||
mock_execute.return_value = {
|
||||
"status": TaskState.completed,
|
||||
"result": "Done",
|
||||
"history": [history_msg],
|
||||
}
|
||||
tool._run(message="Please help")
|
||||
|
||||
endpoint_state = state._per_endpoint[config.endpoint]
|
||||
assert "remote-task-1" in endpoint_state.reference_task_ids
|
||||
assert endpoint_state.task_id is None
|
||||
assert task.config is not None
|
||||
assert task.config["reference_task_ids"] == ["remote-task-1"]
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
|
||||
def test_delegation_tool_returns_error_message_on_failure():
|
||||
"""A non-completed/non-input-required status surfaces as a readable error string."""
|
||||
from a2a.types import TaskState
|
||||
|
||||
from crewai import Agent, Task
|
||||
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
|
||||
|
||||
config = A2AClientConfig(endpoint="http://test-endpoint.com")
|
||||
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
|
||||
task = Task(description="test", expected_output="test", agent=agent)
|
||||
|
||||
card = _create_mock_agent_card()
|
||||
state = A2ADelegationState(agent=agent, task=task)
|
||||
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
|
||||
|
||||
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
|
||||
mock_execute.return_value = {
|
||||
"status": TaskState.failed,
|
||||
"error": "remote agent unreachable",
|
||||
"status": "completed",
|
||||
"result": "Done by remote",
|
||||
"history": [],
|
||||
}
|
||||
result = tool._run(message="Please help")
|
||||
|
||||
assert "remote agent unreachable" in result
|
||||
result = _delegate_to_a2a(
|
||||
self=agent,
|
||||
agent_response=MockResponse(),
|
||||
task=task,
|
||||
original_fn=mock_original_fn,
|
||||
context=None,
|
||||
tools=None,
|
||||
agent_cards={"http://test-endpoint.com/": mock_card},
|
||||
original_task_description="test",
|
||||
)
|
||||
|
||||
# Should call original_fn to get server response
|
||||
assert call_count >= 1
|
||||
assert result == "Server final answer"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
|
||||
def test_delegation_tool_respects_max_turns_via_usage_count():
|
||||
"""A2AConfig.max_turns wires through to BaseTool.max_usage_count."""
|
||||
from crewai import Agent, Task
|
||||
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
|
||||
|
||||
config = A2AClientConfig(endpoint="http://test-endpoint.com", max_turns=2)
|
||||
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
|
||||
task = Task(description="test", expected_output="test", agent=agent)
|
||||
|
||||
card = _create_mock_agent_card()
|
||||
state = A2ADelegationState(agent=agent, task=task)
|
||||
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
|
||||
|
||||
assert tool.max_usage_count == 2
|
||||
|
||||
|
||||
def test_default_trust_remote_completion_status_is_false():
|
||||
"""Verify that default value of trust_remote_completion_status is False."""
|
||||
a2a_config = A2AConfig(endpoint="http://test-endpoint.com")
|
||||
a2a_config = A2AConfig(
|
||||
endpoint="http://test-endpoint.com",
|
||||
)
|
||||
|
||||
assert a2a_config.trust_remote_completion_status is False
|
||||
Reference in New Issue
Block a user