Compare commits

..

3 Commits

Author SHA1 Message Date
iris-clawd
c36827b45b fix(docs/pt-BR): replace untranslated code block placeholders (#5781)
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Check Documentation Broken Links / Check broken links (push) Waiting to run
Vulnerability Scan / pip-audit (push) Waiting to run
* fix(docs/pt-BR): replace untranslated code block placeholders

Replace all `# (O código não é traduzido)` and `# código não traduzido`
placeholder comments in the PT-BR docs with the actual code from the
English source files.

Files fixed:
- docs/pt-BR/concepts/flows.mdx (~15 placeholders → real code)
- docs/pt-BR/guides/flows/mastering-flow-state.mdx (~17 placeholders → real code)

Code itself is kept in English per i18n conventions. Inline # comments
within code blocks have been translated to Portuguese.

* fix(docs/pt-BR): address CodeRabbit review comments

- flows.mdx: add missing load_dotenv() call after imports
- mastering-flow-state.mdx: fix PersistentCounterFlow second-run example
  to pass inputs={"id": flow1.state.id} to kickoff(), matching the
  documented resume pattern; update comment accordingly
2026-05-13 12:23:18 -03:00
Lorenze Jay
264da8245a Lorenze/imp/prompt layering (#5774)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
* improving prompt structure especially for prompt caching

* addressing comments
2026-05-12 12:39:12 -07:00
Mani
f2960ccaaf Added docs for TavilyGetResearch (#5707)
* Add Tavily Research and get Research

- Added tavily research with docs to crew AI

- Added tavily get research with docs to crew AI

* Update `tavily-python` installation instructions and adjust version constraints

- Changed installation command from `pip install` to `uv add` for `tavily-python` in multiple documentation files.
- Updated version constraint for `tavily-python` in `pyproject.toml` from `>=0.7.14` to `~=0.7.14`.
- Modified the `exclude-newer` date in `uv.lock` to `2026-04-23T07:00:00Z`.

* Add Tavily Research Tool documentation in multiple languages

- Introduced `TavilyResearchTool` documentation in English, Arabic, Korean, and Portuguese.
- Updated `docs.json` to include paths for the new documentation files.
- The `TavilyResearchTool` allows CrewAI agents to perform multi-step research tasks and generate cited reports using the Tavily Research API.

* Fix Tavily research CI failures

* added getResearchTool docs

- Added docs for getResearchTool

---------

Co-authored-by: lorenzejay <lorenzejaytech@gmail.com>
Co-authored-by: Evan Rimer <evan.rimer@tavily.com>
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2026-05-12 12:25:45 -07:00
21 changed files with 1466 additions and 93 deletions

View File

@@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"

View File

@@ -1216,6 +1216,8 @@
"en/tools/search-research/youtubevideosearchtool",
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool",
"en/tools/search-research/tavilyresearchtool",
"en/tools/search-research/tavilygetresearchtool",
"en/tools/search-research/arxivpapertool",
"en/tools/search-research/serpapi-googlesearchtool",
"en/tools/search-research/serpapi-googleshoppingtool",
@@ -1698,6 +1700,7 @@
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool",
"en/tools/search-research/tavilyresearchtool",
"en/tools/search-research/tavilygetresearchtool",
"en/tools/search-research/arxivpapertool",
"en/tools/search-research/serpapi-googlesearchtool",
"en/tools/search-research/serpapi-googleshoppingtool",
@@ -2181,6 +2184,7 @@
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool",
"en/tools/search-research/tavilyresearchtool",
"en/tools/search-research/tavilygetresearchtool",
"en/tools/search-research/arxivpapertool",
"en/tools/search-research/serpapi-googlesearchtool",
"en/tools/search-research/serpapi-googleshoppingtool",
@@ -2664,6 +2668,7 @@
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool",
"en/tools/search-research/tavilyresearchtool",
"en/tools/search-research/tavilygetresearchtool",
"en/tools/search-research/arxivpapertool",
"en/tools/search-research/serpapi-googlesearchtool",
"en/tools/search-research/serpapi-googleshoppingtool",
@@ -3147,6 +3152,7 @@
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool",
"en/tools/search-research/tavilyresearchtool",
"en/tools/search-research/tavilygetresearchtool",
"en/tools/search-research/arxivpapertool",
"en/tools/search-research/serpapi-googlesearchtool",
"en/tools/search-research/serpapi-googleshoppingtool",
@@ -3629,6 +3635,7 @@
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool",
"en/tools/search-research/tavilyresearchtool",
"en/tools/search-research/tavilygetresearchtool",
"en/tools/search-research/arxivpapertool",
"en/tools/search-research/serpapi-googlesearchtool",
"en/tools/search-research/serpapi-googleshoppingtool",
@@ -4110,6 +4117,7 @@
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool",
"en/tools/search-research/tavilyresearchtool",
"en/tools/search-research/tavilygetresearchtool",
"en/tools/search-research/arxivpapertool",
"en/tools/search-research/serpapi-googlesearchtool",
"en/tools/search-research/serpapi-googleshoppingtool",
@@ -4591,6 +4599,7 @@
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool",
"en/tools/search-research/tavilyresearchtool",
"en/tools/search-research/tavilygetresearchtool",
"en/tools/search-research/arxivpapertool",
"en/tools/search-research/serpapi-googlesearchtool",
"en/tools/search-research/serpapi-googleshoppingtool",
@@ -5072,6 +5081,7 @@
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool",
"en/tools/search-research/tavilyresearchtool",
"en/tools/search-research/tavilygetresearchtool",
"en/tools/search-research/arxivpapertool",
"en/tools/search-research/serpapi-googlesearchtool",
"en/tools/search-research/serpapi-googleshoppingtool",
@@ -5555,6 +5565,7 @@
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool",
"en/tools/search-research/tavilyresearchtool",
"en/tools/search-research/tavilygetresearchtool",
"en/tools/search-research/arxivpapertool",
"en/tools/search-research/serpapi-googlesearchtool",
"en/tools/search-research/serpapi-googleshoppingtool",
@@ -6037,6 +6048,7 @@
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool",
"en/tools/search-research/tavilyresearchtool",
"en/tools/search-research/tavilygetresearchtool",
"en/tools/search-research/arxivpapertool",
"en/tools/search-research/serpapi-googlesearchtool",
"en/tools/search-research/serpapi-googleshoppingtool",

View File

@@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"

View File

@@ -313,9 +313,9 @@ flow1 = PersistentCounterFlow()
result1 = flow1.kickoff()
print(f"First run result: {result1}")
# Second run - state is automatically loaded
# Second run - pass the ID to load the persisted state
flow2 = PersistentCounterFlow()
result2 = flow2.kickoff()
result2 = flow2.kickoff(inputs={"id": flow1.state.id})
print(f"Second run result: {result2}") # Will be higher due to persisted state
```

View File

@@ -54,6 +54,14 @@ These tools enable your agents to search the web, research topics, and find info
Extract structured content from web pages using the Tavily API.
</Card>
<Card title="Tavily Research Tool" icon="flask" href="/en/tools/search-research/tavilyresearchtool">
Run multi-step research tasks and get cited reports using the Tavily Research API.
</Card>
<Card title="Tavily Get Research Tool" icon="clipboard-list" href="/en/tools/search-research/tavilygetresearchtool">
Retrieve the status and results of an existing Tavily research task.
</Card>
<Card title="Arxiv Paper Tool" icon="box-archive" href="/en/tools/search-research/arxivpapertool">
Search arXiv and optionally download PDFs.
</Card>
@@ -76,7 +84,15 @@ These tools enable your agents to search the web, research topics, and find info
- **Academic Research**: Find scholarly articles and technical papers
```python
from crewai_tools import SerperDevTool, GitHubSearchTool, YoutubeVideoSearchTool, TavilySearchTool, TavilyExtractorTool
from crewai_tools import (
GitHubSearchTool,
SerperDevTool,
TavilyExtractorTool,
TavilyGetResearchTool,
TavilyResearchTool,
TavilySearchTool,
YoutubeVideoSearchTool,
)
# Create research tools
web_search = SerperDevTool()
@@ -84,11 +100,21 @@ code_search = GitHubSearchTool()
video_research = YoutubeVideoSearchTool()
tavily_search = TavilySearchTool()
content_extractor = TavilyExtractorTool()
tavily_research = TavilyResearchTool()
tavily_get_research = TavilyGetResearchTool()
# Add to your agent
agent = Agent(
role="Research Analyst",
tools=[web_search, code_search, video_research, tavily_search, content_extractor],
tools=[
web_search,
code_search,
video_research,
tavily_search,
content_extractor,
tavily_research,
tavily_get_research,
],
goal="Gather comprehensive information on any topic"
)
```

View File

@@ -0,0 +1,85 @@
---
title: "Tavily Get Research Tool"
description: "Retrieve the status and results of an existing Tavily research task"
icon: "clipboard-list"
mode: "wide"
---
The `TavilyGetResearchTool` lets CrewAI agents check an existing Tavily research task by `request_id`. Use it when a research task was started earlier and you need to retrieve its current status or final results.
If you need to start a new research job, use the [Tavily Research Tool](/en/tools/search-research/tavilyresearchtool). This tool is specifically for looking up an existing Tavily research request after you already have its `request_id`.
## Installation
To use the `TavilyGetResearchTool`, install the `tavily-python` library alongside `crewai-tools`:
```shell
uv add 'crewai[tools]' tavily-python
```
## Environment Variables
Set your Tavily API key:
```bash
export TAVILY_API_KEY='your_tavily_api_key'
```
Get an API key at [https://app.tavily.com/](https://app.tavily.com/) (sign up, then create a key).
## Example Usage
```python
from crewai_tools import TavilyGetResearchTool
tavily_get_research_tool = TavilyGetResearchTool()
status_result = tavily_get_research_tool.run(
request_id="your-research-request-id"
)
print(status_result)
```
## Common Workflow
Use `TavilyGetResearchTool` when your application or another service has already created a Tavily research task and saved its `request_id`.
Typical cases include:
- Polling for completion after kicking off research in a background job.
- Looking up the latest status of a long-running research task.
- Fetching final research output from a previously created Tavily request.
## Configuration Options
The `TavilyGetResearchTool` accepts the following argument when calling the `run` method:
- `request_id` (str): **Required.** The existing Tavily research request ID to retrieve.
## Async Usage
Use `_arun` when your application is already running inside an async event loop:
```python
from crewai_tools import TavilyGetResearchTool
tavily_get_research_tool = TavilyGetResearchTool()
status_result = await tavily_get_research_tool._arun(
request_id="your-research-request-id"
)
```
## Features
- **Research status retrieval**: Fetch the current status of an existing Tavily research task.
- **Result retrieval**: Return available research output once Tavily has completed the task.
- **Sync and async**: Use either `_run`/`run` or `_arun` depending on your application's runtime.
- **JSON output**: Returns Tavily responses as formatted JSON strings.
## Response Format
The tool returns a JSON string containing the current research task status and any available results from Tavily. The exact response shape depends on the task state returned by Tavily, so incomplete tasks may return status information before the final research output is available.
Refer to the [Tavily API documentation](https://docs.tavily.com/) for full details on the Research API.

View File

@@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"

View File

@@ -24,7 +24,63 @@ Os flows permitem que você crie fluxos de trabalho estruturados e orientados po
Vamos criar um Flow simples no qual você usará a OpenAI para gerar uma cidade aleatória em uma tarefa e, em seguida, usará essa cidade para gerar uma curiosidade em outra tarefa.
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"
@start()
def generate_city(self):
print("Starting flow")
# Cada estado do flow recebe automaticamente um ID único
print(f"Flow State ID: {self.state['id']}")
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": "Return the name of a random city in the world.",
},
],
)
random_city = response["choices"][0]["message"]["content"]
# Armazena a cidade no nosso estado
self.state["city"] = random_city
print(f"Random City: {random_city}")
return random_city
@listen(generate_city)
def generate_fun_fact(self, random_city):
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": f"Tell me a fun fact about {random_city}",
},
],
)
fun_fact = response["choices"][0]["message"]["content"]
# Armazena a curiosidade no nosso estado
self.state["fun_fact"] = fun_fact
return fun_fact
flow = ExampleFlow()
flow.plot()
result = flow.kickoff()
print(f"Generated fun fact: {result}")
```
Na ilustração acima, criamos um Flow simples que gera uma cidade aleatória usando a OpenAI e depois cria uma curiosidade sobre essa cidade. O Flow consiste em duas tarefas: `generate_city` e `generate_fun_fact`. A tarefa `generate_city` é o ponto de início do Flow, enquanto a tarefa `generate_fun_fact` fica escutando o resultado da tarefa `generate_city`.
@@ -56,12 +112,16 @@ O decorador `@listen()` pode ser usado de várias formas:
1. **Escutando um Método pelo Nome**: Você pode passar o nome do método ao qual deseja escutar como string. Quando esse método concluir, o método ouvinte será chamado.
```python Code
# (O código não é traduzido)
@listen("generate_city")
def generate_fun_fact(self, random_city):
# Implementação
```
2. **Escutando um Método Diretamente**: Você pode passar o próprio método. Quando esse método concluir, o método ouvinte será chamado.
```python Code
# (O código não é traduzido)
@listen(generate_city)
def generate_fun_fact(self, random_city):
# Implementação
```
### Saída de um Flow
@@ -76,7 +136,24 @@ Veja como acessar a saída final:
<CodeGroup>
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, start
class OutputExampleFlow(Flow):
@start()
def first_method(self):
return "Output from first_method"
@listen(first_method)
def second_method(self, first_output):
return f"Second method received: {first_output}"
flow = OutputExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
```
```text Output
@@ -97,8 +174,34 @@ Além de recuperar a saída final, você pode acessar e atualizar o estado dentr
Veja um exemplo de como atualizar e acessar o estado:
<CodeGroup>
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
counter: int = 0
message: str = ""
class StateExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
self.state.message = "Hello from first_method"
self.state.counter += 1
@listen(first_method)
def second_method(self):
self.state.message += " - updated by second_method"
self.state.counter += 1
return self.state.message
flow = StateExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
```
```text Output
@@ -128,7 +231,33 @@ Essa abordagem oferece flexibilidade, permitindo que o desenvolvedor adicione ou
Mesmo com estados não estruturados, os flows do CrewAI geram e mantêm automaticamente um identificador único (UUID) para cada instância de estado.
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, start
class UnstructuredExampleFlow(Flow):
@start()
def first_method(self):
# O estado inclui automaticamente um campo 'id'
print(f"State ID: {self.state['id']}")
self.state['counter'] = 0
self.state['message'] = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated"
@listen(second_method)
def third_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated again"
print(f"State after third_method: {self.state}")
flow = UnstructuredExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
![Flow Visual image](/images/crewai-flow-3.png)
@@ -148,7 +277,39 @@ Ao usar modelos como o `BaseModel` da Pydantic, os desenvolvedores podem definir
Cada estado nos flows do CrewAI recebe automaticamente um identificador único (UUID) para ajudar no rastreamento e gerenciamento. Esse ID é gerado e mantido automaticamente pelo sistema de flows.
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
# Nota: o campo 'id' é adicionado automaticamente a todos os estados
counter: int = 0
message: str = ""
class StructuredExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
# Acesse o ID gerado automaticamente, se necessário
print(f"State ID: {self.state.id}")
self.state.message = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state.counter += 1
self.state.message += " - updated"
@listen(second_method)
def third_method(self):
self.state.counter += 1
self.state.message += " - updated again"
print(f"State after third_method: {self.state}")
flow = StructuredExampleFlow()
flow.kickoff()
```
![Flow Visual image](/images/crewai-flow-3.png)
@@ -182,7 +343,19 @@ O decorador @persist permite a persistência automática do estado nos flows do
Quando aplicado no nível da classe, o decorador @persist garante a persistência automática de todos os estados dos métodos do flow:
```python
# (O código não é traduzido)
@persist # Usa SQLiteFlowPersistence por padrão
class MyFlow(Flow[MyState]):
@start()
def initialize_flow(self):
# Este método terá seu estado persistido automaticamente
self.state.counter = 1
print("Initialized flow. State ID:", self.state.id)
@listen(initialize_flow)
def next_step(self):
# O estado (incluindo self.state.id) é recarregado automaticamente
self.state.counter += 1
print("Flow state is persisted. Counter:", self.state.counter)
```
### Persistência no Nível de Método
@@ -190,7 +363,14 @@ Quando aplicado no nível da classe, o decorador @persist garante a persistênci
Para um controle mais granular, você pode aplicar @persist em métodos específicos:
```python
# (O código não é traduzido)
class AnotherFlow(Flow[dict]):
@persist # Persiste apenas o estado deste método
@start()
def begin(self):
if "runs" not in self.state:
self.state["runs"] = 0
self.state["runs"] += 1
print("Method-level persisted runs:", self.state["runs"])
```
### Forking de Estado Persistido
@@ -282,8 +462,29 @@ A arquitetura de persistência enfatiza precisão técnica e opções de persona
A função `or_` nos flows permite escutar múltiplos métodos e acionar o método ouvinte quando qualquer um dos métodos especificados gerar uma saída.
<CodeGroup>
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, listen, or_, start
class OrExampleFlow(Flow):
@start()
def start_method(self):
return "Hello from the start method"
@listen(start_method)
def second_method(self):
return "Hello from the second method"
@listen(or_(start_method, second_method))
def logger(self, result):
print(f"Logger: {result}")
flow = OrExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
```text Output
@@ -302,8 +503,28 @@ A função `or_` serve para escutar vários métodos e disparar o método ouvint
A função `and_` nos flows permite escutar múltiplos métodos e acionar o método ouvinte apenas quando todos os métodos especificados emitirem uma saída.
<CodeGroup>
```python Code
# (O código não é traduzido)
from crewai.flow.flow import Flow, and_, listen, start
class AndExampleFlow(Flow):
@start()
def start_method(self):
self.state["greeting"] = "Hello from the start method"
@listen(start_method)
def second_method(self):
self.state["joke"] = "What do computers eat? Microchips."
@listen(and_(start_method, second_method))
def logger(self):
print("---- Logger ----")
print(self.state)
flow = AndExampleFlow()
flow.plot()
flow.kickoff()
```
```text Output
@@ -323,8 +544,42 @@ O decorador `@router()` nos flows permite definir lógica de roteamento condicio
Você pode especificar diferentes rotas conforme a saída do método, permitindo controlar o fluxo de execução de forma dinâmica.
<CodeGroup>
```python Code
# (O código não é traduzido)
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class ExampleState(BaseModel):
success_flag: bool = False
class RouterFlow(Flow[ExampleState]):
@start()
def start_method(self):
print("Starting the structured flow")
random_boolean = random.choice([True, False])
self.state.success_flag = random_boolean
@router(start_method)
def second_method(self):
if self.state.success_flag:
return "success"
else:
return "failed"
@listen("success")
def third_method(self):
print("Third method running")
@listen("failed")
def fourth_method(self):
print("Fourth method running")
flow = RouterFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
```text Output
@@ -401,7 +656,105 @@ Para um guia completo sobre feedback humano em flows, incluindo feedback assínc
Os agentes podem ser integrados facilmente aos seus flows, oferecendo uma alternativa leve às crews completas quando você precisar executar tarefas simples e focadas. Veja um exemplo de como utilizar um agente em um flow para realizar uma pesquisa de mercado:
```python
# (O código não é traduzido)
import asyncio
from typing import Any, Dict, List
from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start
# Define um formato de saída estruturado
class MarketAnalysis(BaseModel):
key_trends: List[str] = Field(description="List of identified market trends")
market_size: str = Field(description="Estimated market size")
competitors: List[str] = Field(description="Major competitors in the space")
# Define o estado do flow
class MarketResearchState(BaseModel):
product: str = ""
analysis: MarketAnalysis | None = None
# Cria uma classe de flow
class MarketResearchFlow(Flow[MarketResearchState]):
@start()
def initialize_research(self) -> Dict[str, Any]:
print(f"Starting market research for {self.state.product}")
return {"product": self.state.product}
@listen(initialize_research)
async def analyze_market(self) -> Dict[str, Any]:
# Cria um agente para pesquisa de mercado
analyst = Agent(
role="Market Research Analyst",
goal=f"Analyze the market for {self.state.product}",
backstory="You are an experienced market analyst with expertise in "
"identifying market trends and opportunities.",
tools=[SerperDevTool()],
verbose=True,
)
# Define a consulta de pesquisa
query = f"""
Research the market for {self.state.product}. Include:
1. Key market trends
2. Market size
3. Major competitors
Format your response according to the specified structure.
"""
# Executa a análise com formato de saída estruturado
result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
if result.pydantic:
print("result", result.pydantic)
else:
print("result", result)
# Retorna a análise para atualizar o estado
return {"analysis": result.pydantic}
@listen(analyze_market)
def present_results(self, analysis) -> None:
print("\nMarket Analysis Results")
print("=====================")
if isinstance(analysis, dict):
# Se recebemos um dict com a chave 'analysis', extrai o objeto de análise real
market_analysis = analysis.get("analysis")
else:
market_analysis = analysis
if market_analysis and isinstance(market_analysis, MarketAnalysis):
print("\nKey Market Trends:")
for trend in market_analysis.key_trends:
print(f"- {trend}")
print(f"\nMarket Size: {market_analysis.market_size}")
print("\nMajor Competitors:")
for competitor in market_analysis.competitors:
print(f"- {competitor}")
else:
print("No structured analysis data available.")
print("Raw analysis:", analysis)
# Exemplo de uso
async def run_flow():
flow = MarketResearchFlow()
flow.plot("MarketResearchFlowPlot")
result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
return result
# Executa o flow
if __name__ == "__main__":
asyncio.run(run_flow())
```
![Flow Visual image](/images/crewai-flow-7.png)
@@ -463,7 +816,50 @@ No arquivo `main.py`, você cria seu flow e conecta as crews. É possível defin
Veja um exemplo de como conectar a `poem_crew` no arquivo `main.py`:
```python Code
# (O código não é traduzido)
#!/usr/bin/env python
from random import randint
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew
class PoemState(BaseModel):
sentence_count: int = 1
poem: str = ""
class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
print("Generating sentence count")
self.state.sentence_count = randint(1, 5)
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})
print("Poem generated", result.raw)
self.state.poem = result.raw
@listen(generate_poem)
def save_poem(self):
print("Saving poem")
with open("poem.txt", "w") as f:
f.write(self.state.poem)
def kickoff():
poem_flow = PoemFlow()
poem_flow.kickoff()
def plot():
poem_flow = PoemFlow()
poem_flow.plot("PoemFlowPlot")
if __name__ == "__main__":
kickoff()
plot()
```
Neste exemplo, a classe `PoemFlow` define um fluxo que gera a quantidade de frases, usa a `PoemCrew` para gerar um poema e, depois, salva o poema em um arquivo. O flow inicia com o método `kickoff()`, e o gráfico é gerado pelo método `plot()`.
@@ -515,7 +911,8 @@ O CrewAI oferece duas formas práticas de gerar plots dos seus flows:
Se estiver trabalhando diretamente com uma instância do flow, basta chamar o método `plot()` do objeto. Isso criará um arquivo HTML com o plot interativo do seu flow.
```python Code
# (O código não é traduzido)
# Considerando que você já tem uma instância do flow
flow.plot("my_flow_plot")
```
Esse comando gera um arquivo chamado `my_flow_plot.html` no diretório atual. Abra esse arquivo em um navegador para visualizar o plot interativo.

View File

@@ -63,7 +63,60 @@ Com estado não estruturado:
Veja um exemplo simples de gerenciamento de estado não estruturado:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
class UnstructuredStateFlow(Flow):
@start()
def initialize_data(self):
print("Initializing flow data")
# Adiciona pares chave-valor ao estado
self.state["user_name"] = "Alex"
self.state["preferences"] = {
"theme": "dark",
"language": "English"
}
self.state["items"] = []
# O estado do flow recebe automaticamente um ID único
print(f"Flow ID: {self.state['id']}")
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
print(f"Previous step returned: {previous_result}")
# Acessa e modifica o estado
user = self.state["user_name"]
print(f"Processing data for {user}")
# Adiciona itens a uma lista no estado
self.state["items"].append("item1")
self.state["items"].append("item2")
# Adiciona um novo par chave-valor
self.state["processed"] = True
return "Processed"
@listen(process_data)
def generate_summary(self, previous_result):
# Acessa múltiplos valores do estado
user = self.state["user_name"]
theme = self.state["preferences"]["theme"]
items = self.state["items"]
processed = self.state.get("processed", False)
summary = f"User {user} has {len(items)} items with {theme} theme. "
summary += "Data is processed." if processed else "Data is not processed."
return summary
# Executa o flow
flow = UnstructuredStateFlow()
result = flow.kickoff()
print(f"Final result: {result}")
print(f"Final state: {flow.state}")
```
### Quando Usar Estado Não Estruturado
@@ -94,7 +147,63 @@ Ao utilizar estado estruturado:
Veja como implementar o gerenciamento de estado estruturado:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
# Define o modelo de estado
class UserPreferences(BaseModel):
theme: str = "light"
language: str = "English"
class AppState(BaseModel):
user_name: str = ""
preferences: UserPreferences = UserPreferences()
items: List[str] = []
processed: bool = False
completion_percentage: float = 0.0
# Cria um flow com estado tipado
class StructuredStateFlow(Flow[AppState]):
@start()
def initialize_data(self):
print("Initializing flow data")
# Define valores do estado (com checagem de tipo)
self.state.user_name = "Taylor"
self.state.preferences.theme = "dark"
# O campo ID está disponível automaticamente
print(f"Flow ID: {self.state.id}")
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
print(f"Processing data for {self.state.user_name}")
# Modifica o estado (com checagem de tipo)
self.state.items.append("item1")
self.state.items.append("item2")
self.state.processed = True
self.state.completion_percentage = 50.0
return "Processed"
@listen(process_data)
def generate_summary(self, previous_result):
# Acessa o estado (com autocompletar)
summary = f"User {self.state.user_name} has {len(self.state.items)} items "
summary += f"with {self.state.preferences.theme} theme. "
summary += "Data is processed." if self.state.processed else "Data is not processed."
summary += f" Completion: {self.state.completion_percentage}%"
return summary
# Executa o flow
flow = StructuredStateFlow()
result = flow.kickoff()
print(f"Final result: {result}")
print(f"Final state: {flow.state}")
```
### Benefícios do Estado Estruturado
@@ -138,7 +247,29 @@ Independente de você usar estado estruturado ou não estruturado, é possível
Métodos do flow podem retornar valores que serão passados como argumento para métodos listeners:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
class DataPassingFlow(Flow):
@start()
def generate_data(self):
# Este valor de retorno será passado para os métodos listeners
return "Generated data"
@listen(generate_data)
def process_data(self, data_from_previous_step):
print(f"Received: {data_from_previous_step}")
# Você pode modificar os dados e repassá-los adiante
processed_data = f"{data_from_previous_step} - processed"
# Também atualiza o estado
self.state["last_processed"] = processed_data
return processed_data
@listen(process_data)
def finalize_data(self, processed_data):
print(f"Received processed data: {processed_data}")
# Acessa tanto os dados passados quanto o estado
last_processed = self.state.get("last_processed", "")
return f"Final: {processed_data} (from state: {last_processed})"
```
Esse padrão permite combinar passagem de dados direta com atualizações de estado para obter máxima flexibilidade.
@@ -156,7 +287,36 @@ O decorador `@persist()` automatiza a persistência de estado, salvando o estado
Ao aplicar em nível de classe, `@persist()` salva o estado após cada execução de método:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
value: int = 0
@persist() # Aplica à classe inteira do flow
class PersistentCounterFlow(Flow[CounterState]):
@start()
def increment(self):
self.state.value += 1
print(f"Incremented to {self.state.value}")
return self.state.value
@listen(increment)
def double(self, value):
self.state.value = value * 2
print(f"Doubled to {self.state.value}")
return self.state.value
# Primeira execução
flow1 = PersistentCounterFlow()
result1 = flow1.kickoff()
print(f"First run result: {result1}")
# Segunda execução - passa o ID para carregar o estado persistido
flow2 = PersistentCounterFlow()
result2 = flow2.kickoff(inputs={"id": flow1.state.id})
print(f"Second run result: {result2}") # Será maior devido ao estado persistido
```
#### Persistência em Nível de Método
@@ -164,7 +324,26 @@ Ao aplicar em nível de classe, `@persist()` salva o estado após cada execuçã
Para mais controle, você pode aplicar `@persist()` em métodos específicos:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
class SelectivePersistFlow(Flow):
@start()
def first_step(self):
self.state["count"] = 1
return "First step"
@persist() # Persiste apenas após este método
@listen(first_step)
def important_step(self, prev_result):
self.state["count"] += 1
self.state["important_data"] = "This will be persisted"
return "Important step completed"
@listen(important_step)
def final_step(self, prev_result):
self.state["count"] += 1
return f"Complete with count {self.state['count']}"
```
#### Forking de Estado Persistido
@@ -216,7 +395,45 @@ Notas sobre o comportamento:
Você pode usar o estado para implementar lógicas condicionais complexas em seus flows:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class PaymentState(BaseModel):
amount: float = 0.0
is_approved: bool = False
retry_count: int = 0
class PaymentFlow(Flow[PaymentState]):
@start()
def process_payment(self):
# Simula o processamento do pagamento
self.state.amount = 100.0
self.state.is_approved = self.state.amount < 1000
return "Payment processed"
@router(process_payment)
def check_approval(self, previous_result):
if self.state.is_approved:
return "approved"
elif self.state.retry_count < 3:
return "retry"
else:
return "rejected"
@listen("approved")
def handle_approval(self):
return f"Payment of ${self.state.amount} approved!"
@listen("retry")
def handle_retry(self):
self.state.retry_count += 1
print(f"Retrying payment (attempt {self.state.retry_count})...")
# Aqui poderia ser implementada a lógica de retry
return "Retry initiated"
@listen("rejected")
def handle_rejection(self):
return f"Payment of ${self.state.amount} rejected after {self.state.retry_count} retries."
```
### Manipulações Complexas de Estado
@@ -224,7 +441,60 @@ Você pode usar o estado para implementar lógicas condicionais complexas em seu
Para transformar estados complexos, você pode criar métodos dedicados:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
from typing import List, Dict
class UserData(BaseModel):
name: str
active: bool = True
login_count: int = 0
class ComplexState(BaseModel):
users: Dict[str, UserData] = {}
active_user_count: int = 0
class TransformationFlow(Flow[ComplexState]):
@start()
def initialize(self):
# Adiciona alguns usuários
self.add_user("alice", "Alice")
self.add_user("bob", "Bob")
self.add_user("charlie", "Charlie")
return "Initialized"
@listen(initialize)
def process_users(self, _):
# Incrementa contagens de login
for user_id in self.state.users:
self.increment_login(user_id)
# Desativa um usuário
self.deactivate_user("bob")
# Atualiza a contagem de ativos
self.update_active_count()
return f"Processed {len(self.state.users)} users"
# Métodos auxiliares para transformações de estado
def add_user(self, user_id: str, name: str):
self.state.users[user_id] = UserData(name=name)
self.update_active_count()
def increment_login(self, user_id: str):
if user_id in self.state.users:
self.state.users[user_id].login_count += 1
def deactivate_user(self, user_id: str):
if user_id in self.state.users:
self.state.users[user_id].active = False
self.update_active_count()
def update_active_count(self):
self.state.active_user_count = sum(
1 for user in self.state.users.values() if user.active
)
```
Esse padrão de criar métodos auxiliares mantém seus métodos de flow limpos, enquanto permite manipulações complexas de estado.
@@ -238,7 +508,71 @@ Um dos padrões mais poderosos na CrewAI é combinar o gerenciamento de estado d
Você pode usar o estado do flow para parametrizar crews:
```python
# código não traduzido
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Process, Task
from pydantic import BaseModel
class ResearchState(BaseModel):
topic: str = ""
depth: str = "medium"
results: str = ""
class ResearchFlow(Flow[ResearchState]):
@start()
def get_parameters(self):
# Em uma aplicação real, isso pode vir da entrada do usuário
self.state.topic = "Artificial Intelligence Ethics"
self.state.depth = "deep"
return "Parameters set"
@listen(get_parameters)
def execute_research(self, _):
# Cria os agentes
researcher = Agent(
role="Research Specialist",
goal=f"Research {self.state.topic} in {self.state.depth} detail",
backstory="You are an expert researcher with a talent for finding accurate information."
)
writer = Agent(
role="Content Writer",
goal="Transform research into clear, engaging content",
backstory="You excel at communicating complex ideas clearly and concisely."
)
# Cria as tarefas
research_task = Task(
description=f"Research {self.state.topic} with {self.state.depth} analysis",
expected_output="Comprehensive research notes in markdown format",
agent=researcher
)
writing_task = Task(
description=f"Create a summary on {self.state.topic} based on the research",
expected_output="Well-written article in markdown format",
agent=writer,
context=[research_task]
)
# Cria e executa a crew
research_crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
verbose=True
)
# Executa a crew e armazena o resultado no estado
result = research_crew.kickoff()
self.state.results = result.raw
return "Research completed"
@listen(execute_research)
def summarize_results(self, _):
# Acessa os resultados armazenados
result_length = len(self.state.results)
return f"Research on {self.state.topic} completed with {result_length} characters of results."
```
### Manipulando Saídas de Crews no Estado
@@ -246,7 +580,21 @@ Você pode usar o estado do flow para parametrizar crews:
Quando um crew finaliza, é possível processar sua saída e armazená-la no estado do flow:
```python
# código não traduzido
@listen(execute_crew)
def process_crew_results(self, _):
# Faz parsing dos resultados brutos (assumindo saída em JSON)
import json
try:
results_dict = json.loads(self.state.raw_results)
self.state.processed_results = {
"title": results_dict.get("title", ""),
"main_points": results_dict.get("main_points", []),
"conclusion": results_dict.get("conclusion", "")
}
return "Results processed successfully"
except json.JSONDecodeError:
self.state.error = "Failed to parse crew results as JSON"
return "Error processing results"
```
## Boas Práticas para Gerenciamento de Estado
@@ -256,7 +604,19 @@ Quando um crew finaliza, é possível processar sua saída e armazená-la no est
Projete seu estado para conter somente o necessário:
```python
# Exemplo não traduzido
# Abrangente demais
class BloatedState(BaseModel):
user_data: Dict = {}
system_settings: Dict = {}
temporary_calculations: List = []
debug_info: Dict = {}
# ...muitos outros campos
# Melhor: estado focado
class FocusedState(BaseModel):
user_id: str
preferences: Dict[str, str]
completion_status: Dict[str, bool]
```
### 2. Use Estado Estruturado em Flows Complexos
@@ -264,7 +624,23 @@ Projete seu estado para conter somente o necessário:
À medida que seus flows evoluem em complexidade, o estado estruturado se torna cada vez mais valioso:
```python
# Exemplo não traduzido
# Flow simples pode usar estado não estruturado
class SimpleGreetingFlow(Flow):
@start()
def greet(self):
self.state["name"] = "World"
return f"Hello, {self.state['name']}!"
# Flow complexo se beneficia de estado estruturado
class UserRegistrationState(BaseModel):
username: str
email: str
verification_status: bool = False
registration_date: datetime = Field(default_factory=datetime.now)
last_login: Optional[datetime] = None
class RegistrationFlow(Flow[UserRegistrationState]):
# Métodos com acesso ao estado fortemente tipado
```
### 3. Documente Transições de Estado
@@ -272,7 +648,18 @@ Projete seu estado para conter somente o necessário:
Para flows complexos, documente como o estado muda ao longo da execução:
```python
# Exemplo não traduzido
@start()
def initialize_order(self):
"""
Initialize order state with empty values.
State before: {}
State after: {order_id: str, items: [], status: 'new'}
"""
self.state.order_id = str(uuid.uuid4())
self.state.items = []
self.state.status = "new"
return "Order initialized"
```
### 4. Trate Erros de Estado de Forma Elegante
@@ -280,7 +667,18 @@ Para flows complexos, documente como o estado muda ao longo da execução:
Implemente tratamento de erros ao acessar o estado:
```python
# Exemplo não traduzido
@listen(previous_step)
def process_data(self, _):
try:
# Tenta acessar um valor que pode não existir
user_preference = self.state.preferences.get("theme", "default")
except (AttributeError, KeyError):
# Trata o erro de forma elegante
self.state.errors = self.state.get("errors", [])
self.state.errors.append("Failed to access preferences")
user_preference = "default"
return f"Used preference: {user_preference}"
```
### 5. Use o Estado Para Acompanhar o Progresso
@@ -288,7 +686,30 @@ Implemente tratamento de erros ao acessar o estado:
Aproveite o estado para monitorar o progresso em flows de longa duração:
```python
# Exemplo não traduzido
class ProgressTrackingFlow(Flow):
@start()
def initialize(self):
self.state["total_steps"] = 3
self.state["current_step"] = 0
self.state["progress"] = 0.0
self.update_progress()
return "Initialized"
def update_progress(self):
"""Helper method to calculate and update progress"""
if self.state.get("total_steps", 0) > 0:
self.state["progress"] = (self.state.get("current_step", 0) /
self.state["total_steps"]) * 100
print(f"Progress: {self.state['progress']:.1f}%")
@listen(initialize)
def step_one(self, _):
# Realiza o trabalho...
self.state["current_step"] = 1
self.update_progress()
return "Step 1 complete"
# Etapas adicionais...
```
### 6. Prefira Operações Imutáveis Quando Possível
@@ -296,7 +717,22 @@ Aproveite o estado para monitorar o progresso em flows de longa duração:
Especialmente com estado estruturado, prefira operações imutáveis para maior clareza:
```python
# Exemplo não traduzido
# Em vez de modificar listas no local:
self.state.items.append(new_item) # Operação mutável
# Considere criar um novo estado:
from pydantic import BaseModel
from typing import List
class ItemState(BaseModel):
items: List[str] = []
class ImmutableFlow(Flow[ItemState]):
@start()
def add_item(self):
# Cria uma nova lista com o item adicionado
self.state.items = [*self.state.items, "new item"]
return "Item added"
```
## Depurando o Estado do Flow
@@ -306,7 +742,24 @@ Especialmente com estado estruturado, prefira operações imutáveis para maior
Ao desenvolver, adicione logs para acompanhar mudanças no estado:
```python
# Exemplo não traduzido
import logging
logging.basicConfig(level=logging.INFO)
class LoggingFlow(Flow):
def log_state(self, step_name):
logging.info(f"State after {step_name}: {self.state}")
@start()
def initialize(self):
self.state["counter"] = 0
self.log_state("initialize")
return "Initialized"
@listen(initialize)
def increment(self, _):
self.state["counter"] += 1
self.log_state("increment")
return f"Incremented to {self.state['counter']}"
```
### Visualizando o Estado
@@ -314,7 +767,30 @@ Ao desenvolver, adicione logs para acompanhar mudanças no estado:
Você pode adicionar métodos para visualizar seu estado durante o debug:
```python
# Exemplo não traduzido
def visualize_state(self):
"""Create a simple visualization of the current state"""
import json
from rich.console import Console
from rich.panel import Panel
console = Console()
if hasattr(self.state, "model_dump"):
# Pydantic v2
state_dict = self.state.model_dump()
elif hasattr(self.state, "dict"):
# Pydantic v1
state_dict = self.state.dict()
else:
# Estado não estruturado
state_dict = dict(self.state)
# Remove o id para uma saída mais limpa
if "id" in state_dict:
state_dict.pop("id")
state_json = json.dumps(state_dict, indent=2, default=str)
console.print(Panel(state_json, title="Current Flow State"))
```
## Conclusão

View File

@@ -36,7 +36,6 @@ from typing_extensions import Self, TypeIs
from crewai.agent.planning_config import PlanningConfig
from crewai.agent.utils import (
ahandle_knowledge_retrieval,
append_skill_context,
apply_training_data,
build_task_prompt_with_schema,
format_task_with_context,
@@ -549,7 +548,6 @@ class Agent(BaseAgent):
Returns:
The fully prepared task prompt.
"""
task_prompt = append_skill_context(self, task_prompt)
prepare_tools(self, tools, task)
return apply_training_data(self, task_prompt)
@@ -1486,8 +1484,6 @@ class Agent(BaseAgent):
),
)
formatted_messages = append_skill_context(self, formatted_messages)
inputs: dict[str, Any] = {
"input": formatted_messages,
"tool_names": get_tool_names(parsed_tools),

View File

@@ -213,30 +213,6 @@ def _combine_knowledge_context(agent: Agent) -> str:
return agent_ctx + separator + crew_ctx
def append_skill_context(agent: Agent, task_prompt: str) -> str:
"""Append activated skill context sections to the task prompt.
Args:
agent: The agent with optional skills.
task_prompt: The current task prompt.
Returns:
The task prompt with skill context appended.
"""
if not agent.skills:
return task_prompt
from crewai.skills.loader import format_skill_context
from crewai.skills.models import Skill
skill_sections = [
format_skill_context(s) for s in agent.skills if isinstance(s, Skill)
]
if skill_sections:
task_prompt += "\n\n" + "\n\n".join(skill_sections)
return task_prompt
def apply_training_data(agent: Agent, task_prompt: str) -> str:
"""Apply training data to the task prompt.

View File

@@ -174,6 +174,8 @@ class CrewAgentExecutor(BaseAgentExecutor):
if provider.setup_messages(cast(ExecutorContext, cast(object, self))):
return
from crewai.llms.cache import mark_cache_breakpoint
if self.prompt is not None and "system" in self.prompt:
system_prompt = self._format_prompt(
cast(str, self.prompt.get("system", "")), inputs
@@ -181,11 +183,22 @@ class CrewAgentExecutor(BaseAgentExecutor):
user_prompt = self._format_prompt(
cast(str, self.prompt.get("user", "")), inputs
)
self.messages.append(format_message_for_llm(system_prompt, role="system"))
self.messages.append(format_message_for_llm(user_prompt))
# Cache breakpoints: end-of-system caches the per-agent stable
# prefix; end-of-user caches the per-task stable prefix across
# ReAct-loop iterations.
self.messages.append(
mark_cache_breakpoint(
format_message_for_llm(system_prompt, role="system")
)
)
self.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)
elif self.prompt is not None:
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
self.messages.append(format_message_for_llm(user_prompt))
self.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)
provider.post_setup_messages(cast(ExecutorContext, cast(object, self)))

View File

@@ -2586,16 +2586,26 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
self._kickoff_input = inputs.get("input", "")
if "system" in self.prompt:
from crewai.llms.cache import mark_cache_breakpoint
prompt = cast("SystemPromptResult", self.prompt)
system_prompt = self._format_prompt(prompt["system"], inputs)
user_prompt = self._format_prompt(prompt["user"], inputs)
self.state.messages.append(
format_message_for_llm(system_prompt, role="system")
mark_cache_breakpoint(
format_message_for_llm(system_prompt, role="system")
)
)
self.state.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)
self.state.messages.append(format_message_for_llm(user_prompt))
else:
from crewai.llms.cache import mark_cache_breakpoint
user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
self.state.messages.append(format_message_for_llm(user_prompt))
self.state.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)
self._inject_files_from_inputs(inputs)
@@ -2677,16 +2687,26 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
self._kickoff_input = inputs.get("input", "")
if "system" in self.prompt:
from crewai.llms.cache import mark_cache_breakpoint
prompt = cast("SystemPromptResult", self.prompt)
system_prompt = self._format_prompt(prompt["system"], inputs)
user_prompt = self._format_prompt(prompt["user"], inputs)
self.state.messages.append(
format_message_for_llm(system_prompt, role="system")
mark_cache_breakpoint(
format_message_for_llm(system_prompt, role="system")
)
)
self.state.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)
self.state.messages.append(format_message_for_llm(user_prompt))
else:
from crewai.llms.cache import mark_cache_breakpoint
user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
self.state.messages.append(format_message_for_llm(user_prompt))
self.state.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)
self._inject_files_from_inputs(inputs)

View File

@@ -14,7 +14,7 @@ from datetime import datetime
import json
import logging
import re
from typing import TYPE_CHECKING, Any, Final, Literal
from typing import TYPE_CHECKING, Any, Final, Literal, cast
import uuid
from pydantic import (
@@ -703,10 +703,19 @@ class BaseLLM(BaseModel, ABC):
Raises:
ValueError: If message format is invalid
"""
from crewai.llms.cache import CACHE_BREAKPOINT_KEY
from crewai.utilities.types import LLMMessage as _LLMMessage
if isinstance(messages, str):
return [{"role": "user", "content": messages}]
# Validate message format
# Validate then copy each message, dropping the cache-breakpoint
# flag in the copy only. The caller (e.g. CrewAgentExecutor,
# experimental.AgentExecutor) reuses its messages buffer across
# many LLM calls in the tool-use loop; mutating their dicts
# in place would erase the markers after the first call and
# break prompt caching for every subsequent iteration.
cleaned: list[LLMMessage] = []
for i, msg in enumerate(messages):
if not isinstance(msg, dict):
raise ValueError(f"Message at index {i} must be a dictionary")
@@ -714,8 +723,12 @@ class BaseLLM(BaseModel, ABC):
raise ValueError(
f"Message at index {i} must have 'role' and 'content' keys"
)
copy: dict[str, Any] = {
k: v for k, v in msg.items() if k != CACHE_BREAKPOINT_KEY
}
cleaned.append(cast(_LLMMessage, copy))
return self._process_message_files(messages)
return self._process_message_files(cleaned)
def _process_message_files(self, messages: list[LLMMessage]) -> list[LLMMessage]:
"""Process files attached to messages and format for the provider.

View File

@@ -0,0 +1,37 @@
"""Provider-agnostic prompt-cache breakpoint marker.
Application code (prompt builders, agent executors) marks messages where a
stable prefix ends. Provider adapters then translate the marker into the
cache directive their API expects, or strip it for providers that cache
implicitly (OpenAI, Gemini) or do not cache at all.
Usage:
from crewai.llms.cache import mark_cache_breakpoint
messages = [
mark_cache_breakpoint({"role": "system", "content": stable_system}),
mark_cache_breakpoint({"role": "user", "content": stable_user_prefix}),
{"role": "user", "content": volatile_query},
]
"""
from __future__ import annotations
from typing import Any
CACHE_BREAKPOINT_KEY = "cache_breakpoint"
def mark_cache_breakpoint(message: dict[str, Any]) -> dict[str, Any]:
"""Return ``message`` with the cache-breakpoint flag set.
Returns a new dict so callers can safely pass literal dicts.
"""
return {**message, CACHE_BREAKPOINT_KEY: True}
def strip_cache_breakpoint(message: dict[str, Any]) -> None:
"""Remove the breakpoint flag from a message in place."""
message.pop(CACHE_BREAKPOINT_KEY, None)

View File

@@ -425,7 +425,7 @@ class AnthropicCompletion(BaseLLM):
def _prepare_completion_params(
self,
messages: list[LLMMessage],
system_message: str | None = None,
system_message: str | list[dict[str, Any]] | None = None,
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Any] | None = None,
) -> dict[str, Any]:
@@ -665,7 +665,7 @@ class AnthropicCompletion(BaseLLM):
def _format_messages_for_anthropic(
self, messages: str | list[LLMMessage]
) -> tuple[list[LLMMessage], str | None]:
) -> tuple[list[LLMMessage], str | list[dict[str, Any]] | None]:
"""Format messages for Anthropic API.
Anthropic has specific requirements:
@@ -679,8 +679,51 @@ class AnthropicCompletion(BaseLLM):
messages: Input messages
Returns:
Tuple of (formatted_messages, system_message)
Tuple of (formatted_messages, system_message). `system_message` is
a list of content blocks (with cache_control stamped) when any
system message in the input carried a cache_breakpoint flag;
otherwise a plain string for backwards compatibility.
"""
from crewai.llms.cache import CACHE_BREAKPOINT_KEY
# Read cache_breakpoint flags from raw input BEFORE super strips them.
# We track the CONTENT of marked user/assistant messages so we can
# locate the corresponding block in formatted_messages — Anthropic
# rewrites tool results into user messages, so positional indices
# do not survive the conversion. We must stamp the original stable
# message (typically the initial task prompt), not whatever happens
# to be the trailing user-role block after tool_result expansion.
cache_system = False
cache_match_contents: list[str] = []
if not isinstance(messages, str):
for m in messages:
if not (isinstance(m, dict) and m.get(CACHE_BREAKPOINT_KEY)):
continue
role = m.get("role")
if role == "system":
cache_system = True
continue
if role != "user":
# Only user messages survive Anthropic's role-coalescing
# in a stable, addressable position. Markers on assistant
# or tool messages have no reliable stamp target after
# tool_result expansion, so we ignore them.
continue
raw_content = m.get("content")
if isinstance(raw_content, str) and raw_content:
cache_match_contents.append(raw_content)
continue
if isinstance(raw_content, list):
# Pull text from a single-text-block list so callers that
# pre-format content blocks still match cleanly.
text_blocks = [
b.get("text")
for b in raw_content
if isinstance(b, dict) and b.get("type") == "text"
]
if len(text_blocks) == 1 and isinstance(text_blocks[0], str):
cache_match_contents.append(text_blocks[0])
# Use base class formatting first
base_formatted = super()._format_messages(messages)
@@ -788,7 +831,62 @@ class AnthropicCompletion(BaseLLM):
# If first message is not from user, insert a user message at the beginning
formatted_messages.insert(0, {"role": "user", "content": "Hello"})
return formatted_messages, system_message
# Stamp cache_control on the message(s) whose original content was
# marked. We scan formatted_messages in order and stamp the first
# match per marked content — Anthropic permits up to 4 cache
# breakpoints per request, which is more than enough for our usage.
# Matching by content (rather than position) handles the ReAct
# case where tool_result blocks get expanded into trailing user
# messages: the stable initial-task prompt still maps cleanly.
for needle in cache_match_contents:
for fm in formatted_messages:
if fm.get("role") != "user":
continue
content = fm.get("content")
if isinstance(content, str) and content == needle:
self._stamp_cache_control_on_message(fm)
break
if isinstance(content, list):
fm_texts: list[str] = [
b.get("text", "")
for b in content
if isinstance(b, dict) and b.get("type") == "text"
]
if len(fm_texts) == 1 and fm_texts[0] == needle:
self._stamp_cache_control_on_message(fm)
break
# Convert system to content-block form when caching is requested.
system_payload: str | list[dict[str, Any]] | None = system_message
if system_message and cache_system:
system_payload = [
{
"type": "text",
"text": system_message,
"cache_control": {"type": "ephemeral"},
}
]
return formatted_messages, system_payload
@staticmethod
def _stamp_cache_control_on_message(message: LLMMessage) -> None:
"""Stamp cache_control on the last content block of an Anthropic message."""
msg = cast(dict[str, Any], message)
content = msg.get("content")
if isinstance(content, str):
msg["content"] = [
{
"type": "text",
"text": content,
"cache_control": {"type": "ephemeral"},
}
]
return
if isinstance(content, list) and content:
last = content[-1]
if isinstance(last, dict):
last["cache_control"] = {"type": "ephemeral"}
def _handle_completion(
self,

View File

@@ -161,6 +161,9 @@ def format_skill_context(skill: Skill) -> str:
At METADATA level: returns name and description only.
At INSTRUCTIONS level or above: returns full SKILL.md body.
Output is wrapped in <skill name="..."> XML tags so the block can serve
as a stable cache anchor when injected into the system prompt.
Args:
skill: The skill to format.
@@ -169,7 +172,7 @@ def format_skill_context(skill: Skill) -> str:
"""
if skill.disclosure_level >= INSTRUCTIONS and skill.instructions:
parts = [
f"## Skill: {skill.name}",
f'<skill name="{skill.name}">',
skill.description,
"",
skill.instructions,
@@ -180,5 +183,6 @@ def format_skill_context(skill: Skill) -> str:
for dir_name, files in sorted(skill.resource_files.items()):
if files:
parts.append(f"- **{dir_name}/**: {', '.join(files)}")
parts.append("</skill>")
return "\n".join(parts)
return f"## Skill: {skill.name}\n{skill.description}"
return f'<skill name="{skill.name}">\n{skill.description}\n</skill>'

View File

@@ -86,7 +86,7 @@ class Prompts(BaseModel):
slices.append("tools")
else:
slices.append("no_tools")
system: str = self._build_prompt(slices)
system: str = self._build_prompt(slices) + self._build_skill_block()
# Determine which task slice to use:
task_slice: COMPONENTS
@@ -106,7 +106,7 @@ class Prompts(BaseModel):
return SystemPromptResult(
system=system,
user=self._build_prompt([task_slice]),
prompt=self._build_prompt(slices),
prompt=self._build_prompt(slices) + self._build_skill_block(),
)
return StandardPromptResult(
prompt=self._build_prompt(
@@ -115,8 +115,27 @@ class Prompts(BaseModel):
self.prompt_template,
self.response_template,
)
+ self._build_skill_block()
)
def _build_skill_block(self) -> str:
"""Render the agent's activated skills as a stable XML block.
Skills are agent-scoped (do not change per task), so they live in the
system prompt where prompt-cache prefixes can survive across calls.
"""
skills = getattr(self.agent, "skills", None)
if not skills:
return ""
from crewai.skills.loader import format_skill_context
from crewai.skills.models import Skill
sections = [format_skill_context(s) for s in skills if isinstance(s, Skill)]
if not sections:
return ""
return "\n\n<skills>\n" + "\n\n".join(sections) + "\n</skills>"
def _build_prompt(
self,
components: list[COMPONENTS],

View File

@@ -0,0 +1,196 @@
"""Regression tests for the provider-agnostic prompt-cache breakpoint flag."""
from __future__ import annotations
from crewai.llms.cache import (
CACHE_BREAKPOINT_KEY,
mark_cache_breakpoint,
strip_cache_breakpoint,
)
from crewai.llms.providers.anthropic.completion import AnthropicCompletion
from crewai.llms.providers.openai.completion import OpenAICompletion
class TestCacheMarkerHelpers:
def test_mark_returns_new_dict(self) -> None:
original = {"role": "user", "content": "hi"}
marked = mark_cache_breakpoint(original)
assert marked[CACHE_BREAKPOINT_KEY] is True
# Marker must NOT bleed back into the caller's dict — callers may
# pass literal dicts and reuse them across calls.
assert CACHE_BREAKPOINT_KEY not in original
def test_strip_is_idempotent(self) -> None:
msg = {"role": "user", "content": "hi", CACHE_BREAKPOINT_KEY: True}
strip_cache_breakpoint(msg)
assert CACHE_BREAKPOINT_KEY not in msg
strip_cache_breakpoint(msg)
assert CACHE_BREAKPOINT_KEY not in msg
class TestBaseFormatDoesNotMutate:
"""The strip-on-format pass must not erase markers from the caller's
messages list — executors reuse a single list across many LLM calls,
and mutating it would defeat caching on every iteration after the first.
"""
def test_repeated_format_preserves_markers(self) -> None:
llm = OpenAICompletion(model="gpt-4o-mini")
messages = [
mark_cache_breakpoint({"role": "system", "content": "stable system"}),
mark_cache_breakpoint({"role": "user", "content": "stable user"}),
]
# First call: provider strips markers from the returned (copied) list
first = llm._format_messages(messages)
assert all(CACHE_BREAKPOINT_KEY not in m for m in first)
# Original list must STILL carry the markers
assert messages[0][CACHE_BREAKPOINT_KEY] is True
assert messages[1][CACHE_BREAKPOINT_KEY] is True
# Second call from the same list still sees the markers
second = llm._format_messages(messages)
assert all(CACHE_BREAKPOINT_KEY not in m for m in second)
assert messages[0][CACHE_BREAKPOINT_KEY] is True
assert messages[1][CACHE_BREAKPOINT_KEY] is True
class TestAnthropicCacheStamping:
def test_stamps_system_with_cache_control(self) -> None:
llm = AnthropicCompletion(model="claude-sonnet-4-5")
messages = [
mark_cache_breakpoint({"role": "system", "content": "you are helpful"}),
mark_cache_breakpoint({"role": "user", "content": "ping"}),
]
formatted, system = llm._format_messages_for_anthropic(messages)
assert isinstance(system, list)
assert system[0]["cache_control"] == {"type": "ephemeral"}
assert system[0]["text"] == "you are helpful"
# First user block carries cache_control too
last_block = formatted[0]["content"][-1]
assert last_block["cache_control"] == {"type": "ephemeral"}
def test_stamps_stable_user_not_tool_result(self) -> None:
"""Within a ReAct loop, tool results are flattened into a trailing
user message. We must NOT stamp that volatile trailing block — we
must stamp the original stable user prompt instead.
"""
llm = AnthropicCompletion(model="claude-sonnet-4-5")
messages = [
mark_cache_breakpoint({"role": "system", "content": "you are helpful"}),
mark_cache_breakpoint({"role": "user", "content": "stable task prompt"}),
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "tc_1",
"function": {"name": "ping", "arguments": "{}"},
}
],
},
{"role": "tool", "tool_call_id": "tc_1", "content": "volatile tool result"},
]
formatted, _system = llm._format_messages_for_anthropic(messages)
# Find the message that holds the stable prompt
stable = next(
fm
for fm in formatted
if fm["role"] == "user"
and isinstance(fm["content"], list)
and any(
isinstance(b, dict)
and b.get("type") == "text"
and b.get("text") == "stable task prompt"
for b in fm["content"]
)
)
text_block = next(
b for b in stable["content"] if isinstance(b, dict) and b.get("type") == "text"
)
assert text_block.get("cache_control") == {"type": "ephemeral"}
# The tool_result-bearing user message must NOT be stamped
tool_carrier = next(
fm
for fm in formatted
if fm["role"] == "user"
and isinstance(fm["content"], list)
and any(
isinstance(b, dict) and b.get("type") == "tool_result"
for b in fm["content"]
)
)
for block in tool_carrier["content"]:
assert "cache_control" not in block
def test_assistant_marker_is_ignored(self) -> None:
"""Markers on assistant messages have no stable stamp target after
Anthropic's role coalescing, so they should be silently ignored
rather than collected and then dropped on a mismatch.
"""
llm = AnthropicCompletion(model="claude-sonnet-4-5")
messages = [
mark_cache_breakpoint({"role": "system", "content": "you are helpful"}),
mark_cache_breakpoint(
{"role": "assistant", "content": "I will help you out."}
),
{"role": "user", "content": "ping"},
]
formatted, system = llm._format_messages_for_anthropic(messages)
# System still cached
assert isinstance(system, list)
# No user message was marked → no user message should carry cache_control
for fm in formatted:
if fm.get("role") != "user":
continue
content = fm.get("content")
if isinstance(content, list):
for block in content:
if isinstance(block, dict):
assert "cache_control" not in block
def test_list_content_user_marker_matches(self) -> None:
"""A pre-formatted user message with a single text block should still
match against the post-format user message.
"""
llm = AnthropicCompletion(model="claude-sonnet-4-5")
messages = [
mark_cache_breakpoint(
{
"role": "user",
"content": [{"type": "text", "text": "stable list prompt"}],
}
),
]
formatted, _system = llm._format_messages_for_anthropic(messages)
user_msg = next(fm for fm in formatted if fm["role"] == "user")
content = user_msg["content"]
assert isinstance(content, list)
text_block = next(b for b in content if isinstance(b, dict) and b.get("type") == "text")
assert text_block.get("cache_control") == {"type": "ephemeral"}
def test_unmarked_messages_get_no_cache_control(self) -> None:
llm = AnthropicCompletion(model="claude-sonnet-4-5")
messages = [
{"role": "system", "content": "no caching here"},
{"role": "user", "content": "no caching here either"},
]
formatted, system = llm._format_messages_for_anthropic(messages)
# No marker → system stays a plain string (no content-block conversion)
assert isinstance(system, str)
# No marker → no cache_control anywhere in formatted messages
for fm in formatted:
content = fm.get("content")
if isinstance(content, list):
for block in content:
assert "cache_control" not in block
class TestNonAnthropicStripsMarker:
def test_openai_format_strips_marker_from_wire_payload(self) -> None:
llm = OpenAICompletion(model="gpt-4o-mini")
messages = [
mark_cache_breakpoint({"role": "system", "content": "stable"}),
mark_cache_breakpoint({"role": "user", "content": "hi"}),
]
formatted = llm._format_messages(messages)
for m in formatted:
assert CACHE_BREAKPOINT_KEY not in m

View File

@@ -5,9 +5,9 @@ from pathlib import Path
import pytest
from crewai import Agent
from crewai.agent.utils import append_skill_context
from crewai.skills.loader import activate_skill, discover_skills, format_skill_context
from crewai.skills.models import INSTRUCTIONS, METADATA
from crewai.utilities.prompts import Prompts
def _create_skill_dir(parent: Path, name: str, body: str = "Body.") -> Path:
@@ -34,7 +34,7 @@ class TestSkillDiscoveryAndActivation:
assert activated.instructions == "Use this skill."
context = format_skill_context(activated)
assert "## Skill: my-skill" in context
assert '<skill name="my-skill">' in context
assert "Use this skill." in context
def test_filter_by_skill_names(self, tmp_path: Path) -> None:
@@ -94,7 +94,9 @@ class TestSkillDiscoveryAndActivation:
assert agent.skills[0].disclosure_level == METADATA
assert agent.skills[0].instructions is None
prompt = append_skill_context(agent, "Plan a 10-day Japan itinerary.")
assert "## Skill: travel" in prompt
assert "Skill travel" in prompt
assert "Use this skill for travel planning." not in prompt
result = Prompts(agent=agent, has_tools=False, use_system_prompt=True).task_execution()
system = getattr(result, "system", "") or result.prompt
assert '<skill name="travel">' in system
assert "Skill travel" in system
# METADATA-level skills must not leak full instructions into the prompt
assert "Use this skill for travel planning." not in system

View File

@@ -105,7 +105,7 @@ class TestFormatSkillContext:
frontmatter=fm, path=tmp_path, disclosure_level=METADATA
)
ctx = format_skill_context(skill)
assert "## Skill: test-skill" in ctx
assert '<skill name="test-skill">' in ctx
assert "A skill" in ctx
def test_instructions_level(self, tmp_path: Path) -> None:
@@ -117,7 +117,7 @@ class TestFormatSkillContext:
instructions="Do these things.",
)
ctx = format_skill_context(skill)
assert "## Skill: test-skill" in ctx
assert '<skill name="test-skill">' in ctx
assert "Do these things." in ctx
def test_no_instructions_at_instructions_level(self, tmp_path: Path) -> None:
@@ -129,7 +129,7 @@ class TestFormatSkillContext:
instructions=None,
)
ctx = format_skill_context(skill)
assert ctx == "## Skill: test-skill\nA skill"
assert ctx == '<skill name="test-skill">\nA skill\n</skill>'
def test_resources_level(self, tmp_path: Path) -> None:
fm = SkillFrontmatter(name="test-skill", description="A skill")