diff --git a/docs/ar/concepts/flows.mdx b/docs/ar/concepts/flows.mdx index 2aa62c2d9..000c550de 100644 --- a/docs/ar/concepts/flows.mdx +++ b/docs/ar/concepts/flows.mdx @@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start from dotenv import load_dotenv from litellm import completion +load_dotenv() class ExampleFlow(Flow): model = "gpt-4o-mini" diff --git a/docs/en/concepts/flows.mdx b/docs/en/concepts/flows.mdx index 5aaeaf8ee..cd10afa40 100644 --- a/docs/en/concepts/flows.mdx +++ b/docs/en/concepts/flows.mdx @@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start from dotenv import load_dotenv from litellm import completion +load_dotenv() class ExampleFlow(Flow): model = "gpt-4o-mini" diff --git a/docs/en/guides/flows/mastering-flow-state.mdx b/docs/en/guides/flows/mastering-flow-state.mdx index e2df53f67..68a821246 100644 --- a/docs/en/guides/flows/mastering-flow-state.mdx +++ b/docs/en/guides/flows/mastering-flow-state.mdx @@ -313,9 +313,9 @@ flow1 = PersistentCounterFlow() result1 = flow1.kickoff() print(f"First run result: {result1}") -# Second run - state is automatically loaded +# Second run - pass the ID to load the persisted state flow2 = PersistentCounterFlow() -result2 = flow2.kickoff() +result2 = flow2.kickoff(inputs={"id": flow1.state.id}) print(f"Second run result: {result2}") # Will be higher due to persisted state ``` diff --git a/docs/ko/concepts/flows.mdx b/docs/ko/concepts/flows.mdx index 68ba7ec6b..368efd701 100644 --- a/docs/ko/concepts/flows.mdx +++ b/docs/ko/concepts/flows.mdx @@ -29,6 +29,7 @@ from crewai.flow.flow import Flow, listen, start from dotenv import load_dotenv from litellm import completion +load_dotenv() class ExampleFlow(Flow): model = "gpt-4o-mini" diff --git a/docs/pt-BR/concepts/flows.mdx b/docs/pt-BR/concepts/flows.mdx index 5cd5324f5..8c0deff15 100644 --- a/docs/pt-BR/concepts/flows.mdx +++ b/docs/pt-BR/concepts/flows.mdx @@ -24,7 +24,63 @@ Os flows permitem que você crie fluxos de trabalho estruturados e orientados po Vamos criar um Flow simples no qual você usará a OpenAI para gerar uma cidade aleatória em uma tarefa e, em seguida, usará essa cidade para gerar uma curiosidade em outra tarefa. ```python Code -# (O código não é traduzido) + +from crewai.flow.flow import Flow, listen, start +from dotenv import load_dotenv +from litellm import completion + +load_dotenv() + +class ExampleFlow(Flow): + model = "gpt-4o-mini" + + @start() + def generate_city(self): + print("Starting flow") + # Cada estado do flow recebe automaticamente um ID único + print(f"Flow State ID: {self.state['id']}") + + response = completion( + model=self.model, + messages=[ + { + "role": "user", + "content": "Return the name of a random city in the world.", + }, + ], + ) + + random_city = response["choices"][0]["message"]["content"] + # Armazena a cidade no nosso estado + self.state["city"] = random_city + print(f"Random City: {random_city}") + + return random_city + + @listen(generate_city) + def generate_fun_fact(self, random_city): + response = completion( + model=self.model, + messages=[ + { + "role": "user", + "content": f"Tell me a fun fact about {random_city}", + }, + ], + ) + + fun_fact = response["choices"][0]["message"]["content"] + # Armazena a curiosidade no nosso estado + self.state["fun_fact"] = fun_fact + return fun_fact + + + +flow = ExampleFlow() +flow.plot() +result = flow.kickoff() + +print(f"Generated fun fact: {result}") ``` Na ilustração acima, criamos um Flow simples que gera uma cidade aleatória usando a OpenAI e depois cria uma curiosidade sobre essa cidade. O Flow consiste em duas tarefas: `generate_city` e `generate_fun_fact`. A tarefa `generate_city` é o ponto de início do Flow, enquanto a tarefa `generate_fun_fact` fica escutando o resultado da tarefa `generate_city`. @@ -56,12 +112,16 @@ O decorador `@listen()` pode ser usado de várias formas: 1. **Escutando um Método pelo Nome**: Você pode passar o nome do método ao qual deseja escutar como string. Quando esse método concluir, o método ouvinte será chamado. ```python Code - # (O código não é traduzido) + @listen("generate_city") + def generate_fun_fact(self, random_city): + # Implementação ``` 2. **Escutando um Método Diretamente**: Você pode passar o próprio método. Quando esse método concluir, o método ouvinte será chamado. ```python Code - # (O código não é traduzido) + @listen(generate_city) + def generate_fun_fact(self, random_city): + # Implementação ``` ### Saída de um Flow @@ -76,7 +136,24 @@ Veja como acessar a saída final: ```python Code -# (O código não é traduzido) +from crewai.flow.flow import Flow, listen, start + +class OutputExampleFlow(Flow): + @start() + def first_method(self): + return "Output from first_method" + + @listen(first_method) + def second_method(self, first_output): + return f"Second method received: {first_output}" + + +flow = OutputExampleFlow() +flow.plot("my_flow_plot") +final_output = flow.kickoff() + +print("---- Final Output ----") +print(final_output) ``` ```text Output @@ -97,8 +174,34 @@ Além de recuperar a saída final, você pode acessar e atualizar o estado dentr Veja um exemplo de como atualizar e acessar o estado: + ```python Code -# (O código não é traduzido) +from crewai.flow.flow import Flow, listen, start +from pydantic import BaseModel + +class ExampleState(BaseModel): + counter: int = 0 + message: str = "" + +class StateExampleFlow(Flow[ExampleState]): + + @start() + def first_method(self): + self.state.message = "Hello from first_method" + self.state.counter += 1 + + @listen(first_method) + def second_method(self): + self.state.message += " - updated by second_method" + self.state.counter += 1 + return self.state.message + +flow = StateExampleFlow() +flow.plot("my_flow_plot") +final_output = flow.kickoff() +print(f"Final Output: {final_output}") +print("Final State:") +print(flow.state) ``` ```text Output @@ -128,7 +231,33 @@ Essa abordagem oferece flexibilidade, permitindo que o desenvolvedor adicione ou Mesmo com estados não estruturados, os flows do CrewAI geram e mantêm automaticamente um identificador único (UUID) para cada instância de estado. ```python Code -# (O código não é traduzido) +from crewai.flow.flow import Flow, listen, start + +class UnstructuredExampleFlow(Flow): + + @start() + def first_method(self): + # O estado inclui automaticamente um campo 'id' + print(f"State ID: {self.state['id']}") + self.state['counter'] = 0 + self.state['message'] = "Hello from structured flow" + + @listen(first_method) + def second_method(self): + self.state['counter'] += 1 + self.state['message'] += " - updated" + + @listen(second_method) + def third_method(self): + self.state['counter'] += 1 + self.state['message'] += " - updated again" + + print(f"State after third_method: {self.state}") + + +flow = UnstructuredExampleFlow() +flow.plot("my_flow_plot") +flow.kickoff() ``` ![Flow Visual image](/images/crewai-flow-3.png) @@ -148,7 +277,39 @@ Ao usar modelos como o `BaseModel` da Pydantic, os desenvolvedores podem definir Cada estado nos flows do CrewAI recebe automaticamente um identificador único (UUID) para ajudar no rastreamento e gerenciamento. Esse ID é gerado e mantido automaticamente pelo sistema de flows. ```python Code -# (O código não é traduzido) +from crewai.flow.flow import Flow, listen, start +from pydantic import BaseModel + + +class ExampleState(BaseModel): + # Nota: o campo 'id' é adicionado automaticamente a todos os estados + counter: int = 0 + message: str = "" + + +class StructuredExampleFlow(Flow[ExampleState]): + + @start() + def first_method(self): + # Acesse o ID gerado automaticamente, se necessário + print(f"State ID: {self.state.id}") + self.state.message = "Hello from structured flow" + + @listen(first_method) + def second_method(self): + self.state.counter += 1 + self.state.message += " - updated" + + @listen(second_method) + def third_method(self): + self.state.counter += 1 + self.state.message += " - updated again" + + print(f"State after third_method: {self.state}") + + +flow = StructuredExampleFlow() +flow.kickoff() ``` ![Flow Visual image](/images/crewai-flow-3.png) @@ -182,7 +343,19 @@ O decorador @persist permite a persistência automática do estado nos flows do Quando aplicado no nível da classe, o decorador @persist garante a persistência automática de todos os estados dos métodos do flow: ```python -# (O código não é traduzido) +@persist # Usa SQLiteFlowPersistence por padrão +class MyFlow(Flow[MyState]): + @start() + def initialize_flow(self): + # Este método terá seu estado persistido automaticamente + self.state.counter = 1 + print("Initialized flow. State ID:", self.state.id) + + @listen(initialize_flow) + def next_step(self): + # O estado (incluindo self.state.id) é recarregado automaticamente + self.state.counter += 1 + print("Flow state is persisted. Counter:", self.state.counter) ``` ### Persistência no Nível de Método @@ -190,7 +363,14 @@ Quando aplicado no nível da classe, o decorador @persist garante a persistênci Para um controle mais granular, você pode aplicar @persist em métodos específicos: ```python -# (O código não é traduzido) +class AnotherFlow(Flow[dict]): + @persist # Persiste apenas o estado deste método + @start() + def begin(self): + if "runs" not in self.state: + self.state["runs"] = 0 + self.state["runs"] += 1 + print("Method-level persisted runs:", self.state["runs"]) ``` ### Forking de Estado Persistido @@ -282,8 +462,29 @@ A arquitetura de persistência enfatiza precisão técnica e opções de persona A função `or_` nos flows permite escutar múltiplos métodos e acionar o método ouvinte quando qualquer um dos métodos especificados gerar uma saída. + ```python Code -# (O código não é traduzido) +from crewai.flow.flow import Flow, listen, or_, start + +class OrExampleFlow(Flow): + + @start() + def start_method(self): + return "Hello from the start method" + + @listen(start_method) + def second_method(self): + return "Hello from the second method" + + @listen(or_(start_method, second_method)) + def logger(self, result): + print(f"Logger: {result}") + + + +flow = OrExampleFlow() +flow.plot("my_flow_plot") +flow.kickoff() ``` ```text Output @@ -302,8 +503,28 @@ A função `or_` serve para escutar vários métodos e disparar o método ouvint A função `and_` nos flows permite escutar múltiplos métodos e acionar o método ouvinte apenas quando todos os métodos especificados emitirem uma saída. + ```python Code -# (O código não é traduzido) +from crewai.flow.flow import Flow, and_, listen, start + +class AndExampleFlow(Flow): + + @start() + def start_method(self): + self.state["greeting"] = "Hello from the start method" + + @listen(start_method) + def second_method(self): + self.state["joke"] = "What do computers eat? Microchips." + + @listen(and_(start_method, second_method)) + def logger(self): + print("---- Logger ----") + print(self.state) + +flow = AndExampleFlow() +flow.plot() +flow.kickoff() ``` ```text Output @@ -323,8 +544,42 @@ O decorador `@router()` nos flows permite definir lógica de roteamento condicio Você pode especificar diferentes rotas conforme a saída do método, permitindo controlar o fluxo de execução de forma dinâmica. + ```python Code -# (O código não é traduzido) +import random +from crewai.flow.flow import Flow, listen, router, start +from pydantic import BaseModel + +class ExampleState(BaseModel): + success_flag: bool = False + +class RouterFlow(Flow[ExampleState]): + + @start() + def start_method(self): + print("Starting the structured flow") + random_boolean = random.choice([True, False]) + self.state.success_flag = random_boolean + + @router(start_method) + def second_method(self): + if self.state.success_flag: + return "success" + else: + return "failed" + + @listen("success") + def third_method(self): + print("Third method running") + + @listen("failed") + def fourth_method(self): + print("Fourth method running") + + +flow = RouterFlow() +flow.plot("my_flow_plot") +flow.kickoff() ``` ```text Output @@ -401,7 +656,105 @@ Para um guia completo sobre feedback humano em flows, incluindo feedback assínc Os agentes podem ser integrados facilmente aos seus flows, oferecendo uma alternativa leve às crews completas quando você precisar executar tarefas simples e focadas. Veja um exemplo de como utilizar um agente em um flow para realizar uma pesquisa de mercado: ```python -# (O código não é traduzido) +import asyncio +from typing import Any, Dict, List + +from crewai_tools import SerperDevTool +from pydantic import BaseModel, Field + +from crewai.agent import Agent +from crewai.flow.flow import Flow, listen, start + + +# Define um formato de saída estruturado +class MarketAnalysis(BaseModel): + key_trends: List[str] = Field(description="List of identified market trends") + market_size: str = Field(description="Estimated market size") + competitors: List[str] = Field(description="Major competitors in the space") + + +# Define o estado do flow +class MarketResearchState(BaseModel): + product: str = "" + analysis: MarketAnalysis | None = None + + +# Cria uma classe de flow +class MarketResearchFlow(Flow[MarketResearchState]): + @start() + def initialize_research(self) -> Dict[str, Any]: + print(f"Starting market research for {self.state.product}") + return {"product": self.state.product} + + @listen(initialize_research) + async def analyze_market(self) -> Dict[str, Any]: + # Cria um agente para pesquisa de mercado + analyst = Agent( + role="Market Research Analyst", + goal=f"Analyze the market for {self.state.product}", + backstory="You are an experienced market analyst with expertise in " + "identifying market trends and opportunities.", + tools=[SerperDevTool()], + verbose=True, + ) + + # Define a consulta de pesquisa + query = f""" + Research the market for {self.state.product}. Include: + 1. Key market trends + 2. Market size + 3. Major competitors + + Format your response according to the specified structure. + """ + + # Executa a análise com formato de saída estruturado + result = await analyst.kickoff_async(query, response_format=MarketAnalysis) + if result.pydantic: + print("result", result.pydantic) + else: + print("result", result) + + # Retorna a análise para atualizar o estado + return {"analysis": result.pydantic} + + @listen(analyze_market) + def present_results(self, analysis) -> None: + print("\nMarket Analysis Results") + print("=====================") + + if isinstance(analysis, dict): + # Se recebemos um dict com a chave 'analysis', extrai o objeto de análise real + market_analysis = analysis.get("analysis") + else: + market_analysis = analysis + + if market_analysis and isinstance(market_analysis, MarketAnalysis): + print("\nKey Market Trends:") + for trend in market_analysis.key_trends: + print(f"- {trend}") + + print(f"\nMarket Size: {market_analysis.market_size}") + + print("\nMajor Competitors:") + for competitor in market_analysis.competitors: + print(f"- {competitor}") + else: + print("No structured analysis data available.") + print("Raw analysis:", analysis) + + +# Exemplo de uso +async def run_flow(): + flow = MarketResearchFlow() + flow.plot("MarketResearchFlowPlot") + result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"}) + return result + + +# Executa o flow +if __name__ == "__main__": + asyncio.run(run_flow()) ``` ![Flow Visual image](/images/crewai-flow-7.png) @@ -463,7 +816,50 @@ No arquivo `main.py`, você cria seu flow e conecta as crews. É possível defin Veja um exemplo de como conectar a `poem_crew` no arquivo `main.py`: ```python Code -# (O código não é traduzido) +#!/usr/bin/env python +from random import randint + +from pydantic import BaseModel +from crewai.flow.flow import Flow, listen, start +from .crews.poem_crew.poem_crew import PoemCrew + +class PoemState(BaseModel): + sentence_count: int = 1 + poem: str = "" + +class PoemFlow(Flow[PoemState]): + + @start() + def generate_sentence_count(self): + print("Generating sentence count") + self.state.sentence_count = randint(1, 5) + + @listen(generate_sentence_count) + def generate_poem(self): + print("Generating poem") + result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count}) + + print("Poem generated", result.raw) + self.state.poem = result.raw + + @listen(generate_poem) + def save_poem(self): + print("Saving poem") + with open("poem.txt", "w") as f: + f.write(self.state.poem) + +def kickoff(): + poem_flow = PoemFlow() + poem_flow.kickoff() + + +def plot(): + poem_flow = PoemFlow() + poem_flow.plot("PoemFlowPlot") + +if __name__ == "__main__": + kickoff() + plot() ``` Neste exemplo, a classe `PoemFlow` define um fluxo que gera a quantidade de frases, usa a `PoemCrew` para gerar um poema e, depois, salva o poema em um arquivo. O flow inicia com o método `kickoff()`, e o gráfico é gerado pelo método `plot()`. @@ -515,7 +911,8 @@ O CrewAI oferece duas formas práticas de gerar plots dos seus flows: Se estiver trabalhando diretamente com uma instância do flow, basta chamar o método `plot()` do objeto. Isso criará um arquivo HTML com o plot interativo do seu flow. ```python Code -# (O código não é traduzido) +# Considerando que você já tem uma instância do flow +flow.plot("my_flow_plot") ``` Esse comando gera um arquivo chamado `my_flow_plot.html` no diretório atual. Abra esse arquivo em um navegador para visualizar o plot interativo. diff --git a/docs/pt-BR/guides/flows/mastering-flow-state.mdx b/docs/pt-BR/guides/flows/mastering-flow-state.mdx index 9bc02d6f3..6589b51ad 100644 --- a/docs/pt-BR/guides/flows/mastering-flow-state.mdx +++ b/docs/pt-BR/guides/flows/mastering-flow-state.mdx @@ -63,7 +63,60 @@ Com estado não estruturado: Veja um exemplo simples de gerenciamento de estado não estruturado: ```python -# código não traduzido +from crewai.flow.flow import Flow, listen, start + +class UnstructuredStateFlow(Flow): + @start() + def initialize_data(self): + print("Initializing flow data") + # Adiciona pares chave-valor ao estado + self.state["user_name"] = "Alex" + self.state["preferences"] = { + "theme": "dark", + "language": "English" + } + self.state["items"] = [] + + # O estado do flow recebe automaticamente um ID único + print(f"Flow ID: {self.state['id']}") + + return "Initialized" + + @listen(initialize_data) + def process_data(self, previous_result): + print(f"Previous step returned: {previous_result}") + + # Acessa e modifica o estado + user = self.state["user_name"] + print(f"Processing data for {user}") + + # Adiciona itens a uma lista no estado + self.state["items"].append("item1") + self.state["items"].append("item2") + + # Adiciona um novo par chave-valor + self.state["processed"] = True + + return "Processed" + + @listen(process_data) + def generate_summary(self, previous_result): + # Acessa múltiplos valores do estado + user = self.state["user_name"] + theme = self.state["preferences"]["theme"] + items = self.state["items"] + processed = self.state.get("processed", False) + + summary = f"User {user} has {len(items)} items with {theme} theme. " + summary += "Data is processed." if processed else "Data is not processed." + + return summary + +# Executa o flow +flow = UnstructuredStateFlow() +result = flow.kickoff() +print(f"Final result: {result}") +print(f"Final state: {flow.state}") ``` ### Quando Usar Estado Não Estruturado @@ -94,7 +147,63 @@ Ao utilizar estado estruturado: Veja como implementar o gerenciamento de estado estruturado: ```python -# código não traduzido +from crewai.flow.flow import Flow, listen, start +from pydantic import BaseModel, Field +from typing import List, Dict, Optional + +# Define o modelo de estado +class UserPreferences(BaseModel): + theme: str = "light" + language: str = "English" + +class AppState(BaseModel): + user_name: str = "" + preferences: UserPreferences = UserPreferences() + items: List[str] = [] + processed: bool = False + completion_percentage: float = 0.0 + +# Cria um flow com estado tipado +class StructuredStateFlow(Flow[AppState]): + @start() + def initialize_data(self): + print("Initializing flow data") + # Define valores do estado (com checagem de tipo) + self.state.user_name = "Taylor" + self.state.preferences.theme = "dark" + + # O campo ID está disponível automaticamente + print(f"Flow ID: {self.state.id}") + + return "Initialized" + + @listen(initialize_data) + def process_data(self, previous_result): + print(f"Processing data for {self.state.user_name}") + + # Modifica o estado (com checagem de tipo) + self.state.items.append("item1") + self.state.items.append("item2") + self.state.processed = True + self.state.completion_percentage = 50.0 + + return "Processed" + + @listen(process_data) + def generate_summary(self, previous_result): + # Acessa o estado (com autocompletar) + summary = f"User {self.state.user_name} has {len(self.state.items)} items " + summary += f"with {self.state.preferences.theme} theme. " + summary += "Data is processed." if self.state.processed else "Data is not processed." + summary += f" Completion: {self.state.completion_percentage}%" + + return summary + +# Executa o flow +flow = StructuredStateFlow() +result = flow.kickoff() +print(f"Final result: {result}") +print(f"Final state: {flow.state}") ``` ### Benefícios do Estado Estruturado @@ -138,7 +247,29 @@ Independente de você usar estado estruturado ou não estruturado, é possível Métodos do flow podem retornar valores que serão passados como argumento para métodos listeners: ```python -# código não traduzido +from crewai.flow.flow import Flow, listen, start + +class DataPassingFlow(Flow): + @start() + def generate_data(self): + # Este valor de retorno será passado para os métodos listeners + return "Generated data" + + @listen(generate_data) + def process_data(self, data_from_previous_step): + print(f"Received: {data_from_previous_step}") + # Você pode modificar os dados e repassá-los adiante + processed_data = f"{data_from_previous_step} - processed" + # Também atualiza o estado + self.state["last_processed"] = processed_data + return processed_data + + @listen(process_data) + def finalize_data(self, processed_data): + print(f"Received processed data: {processed_data}") + # Acessa tanto os dados passados quanto o estado + last_processed = self.state.get("last_processed", "") + return f"Final: {processed_data} (from state: {last_processed})" ``` Esse padrão permite combinar passagem de dados direta com atualizações de estado para obter máxima flexibilidade. @@ -156,7 +287,36 @@ O decorador `@persist()` automatiza a persistência de estado, salvando o estado Ao aplicar em nível de classe, `@persist()` salva o estado após cada execução de método: ```python -# código não traduzido +from crewai.flow.flow import Flow, listen, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class CounterState(BaseModel): + value: int = 0 + +@persist() # Aplica à classe inteira do flow +class PersistentCounterFlow(Flow[CounterState]): + @start() + def increment(self): + self.state.value += 1 + print(f"Incremented to {self.state.value}") + return self.state.value + + @listen(increment) + def double(self, value): + self.state.value = value * 2 + print(f"Doubled to {self.state.value}") + return self.state.value + +# Primeira execução +flow1 = PersistentCounterFlow() +result1 = flow1.kickoff() +print(f"First run result: {result1}") + +# Segunda execução - passa o ID para carregar o estado persistido +flow2 = PersistentCounterFlow() +result2 = flow2.kickoff(inputs={"id": flow1.state.id}) +print(f"Second run result: {result2}") # Será maior devido ao estado persistido ``` #### Persistência em Nível de Método @@ -164,7 +324,26 @@ Ao aplicar em nível de classe, `@persist()` salva o estado após cada execuçã Para mais controle, você pode aplicar `@persist()` em métodos específicos: ```python -# código não traduzido +from crewai.flow.flow import Flow, listen, start +from crewai.flow.persistence import persist + +class SelectivePersistFlow(Flow): + @start() + def first_step(self): + self.state["count"] = 1 + return "First step" + + @persist() # Persiste apenas após este método + @listen(first_step) + def important_step(self, prev_result): + self.state["count"] += 1 + self.state["important_data"] = "This will be persisted" + return "Important step completed" + + @listen(important_step) + def final_step(self, prev_result): + self.state["count"] += 1 + return f"Complete with count {self.state['count']}" ``` #### Forking de Estado Persistido @@ -216,7 +395,45 @@ Notas sobre o comportamento: Você pode usar o estado para implementar lógicas condicionais complexas em seus flows: ```python -# código não traduzido +from crewai.flow.flow import Flow, listen, router, start +from pydantic import BaseModel + +class PaymentState(BaseModel): + amount: float = 0.0 + is_approved: bool = False + retry_count: int = 0 + +class PaymentFlow(Flow[PaymentState]): + @start() + def process_payment(self): + # Simula o processamento do pagamento + self.state.amount = 100.0 + self.state.is_approved = self.state.amount < 1000 + return "Payment processed" + + @router(process_payment) + def check_approval(self, previous_result): + if self.state.is_approved: + return "approved" + elif self.state.retry_count < 3: + return "retry" + else: + return "rejected" + + @listen("approved") + def handle_approval(self): + return f"Payment of ${self.state.amount} approved!" + + @listen("retry") + def handle_retry(self): + self.state.retry_count += 1 + print(f"Retrying payment (attempt {self.state.retry_count})...") + # Aqui poderia ser implementada a lógica de retry + return "Retry initiated" + + @listen("rejected") + def handle_rejection(self): + return f"Payment of ${self.state.amount} rejected after {self.state.retry_count} retries." ``` ### Manipulações Complexas de Estado @@ -224,7 +441,60 @@ Você pode usar o estado para implementar lógicas condicionais complexas em seu Para transformar estados complexos, você pode criar métodos dedicados: ```python -# código não traduzido +from crewai.flow.flow import Flow, listen, start +from pydantic import BaseModel +from typing import List, Dict + +class UserData(BaseModel): + name: str + active: bool = True + login_count: int = 0 + +class ComplexState(BaseModel): + users: Dict[str, UserData] = {} + active_user_count: int = 0 + +class TransformationFlow(Flow[ComplexState]): + @start() + def initialize(self): + # Adiciona alguns usuários + self.add_user("alice", "Alice") + self.add_user("bob", "Bob") + self.add_user("charlie", "Charlie") + return "Initialized" + + @listen(initialize) + def process_users(self, _): + # Incrementa contagens de login + for user_id in self.state.users: + self.increment_login(user_id) + + # Desativa um usuário + self.deactivate_user("bob") + + # Atualiza a contagem de ativos + self.update_active_count() + + return f"Processed {len(self.state.users)} users" + + # Métodos auxiliares para transformações de estado + def add_user(self, user_id: str, name: str): + self.state.users[user_id] = UserData(name=name) + self.update_active_count() + + def increment_login(self, user_id: str): + if user_id in self.state.users: + self.state.users[user_id].login_count += 1 + + def deactivate_user(self, user_id: str): + if user_id in self.state.users: + self.state.users[user_id].active = False + self.update_active_count() + + def update_active_count(self): + self.state.active_user_count = sum( + 1 for user in self.state.users.values() if user.active + ) ``` Esse padrão de criar métodos auxiliares mantém seus métodos de flow limpos, enquanto permite manipulações complexas de estado. @@ -238,7 +508,71 @@ Um dos padrões mais poderosos na CrewAI é combinar o gerenciamento de estado d Você pode usar o estado do flow para parametrizar crews: ```python -# código não traduzido +from crewai.flow.flow import Flow, listen, start +from crewai import Agent, Crew, Process, Task +from pydantic import BaseModel + +class ResearchState(BaseModel): + topic: str = "" + depth: str = "medium" + results: str = "" + +class ResearchFlow(Flow[ResearchState]): + @start() + def get_parameters(self): + # Em uma aplicação real, isso pode vir da entrada do usuário + self.state.topic = "Artificial Intelligence Ethics" + self.state.depth = "deep" + return "Parameters set" + + @listen(get_parameters) + def execute_research(self, _): + # Cria os agentes + researcher = Agent( + role="Research Specialist", + goal=f"Research {self.state.topic} in {self.state.depth} detail", + backstory="You are an expert researcher with a talent for finding accurate information." + ) + + writer = Agent( + role="Content Writer", + goal="Transform research into clear, engaging content", + backstory="You excel at communicating complex ideas clearly and concisely." + ) + + # Cria as tarefas + research_task = Task( + description=f"Research {self.state.topic} with {self.state.depth} analysis", + expected_output="Comprehensive research notes in markdown format", + agent=researcher + ) + + writing_task = Task( + description=f"Create a summary on {self.state.topic} based on the research", + expected_output="Well-written article in markdown format", + agent=writer, + context=[research_task] + ) + + # Cria e executa a crew + research_crew = Crew( + agents=[researcher, writer], + tasks=[research_task, writing_task], + process=Process.sequential, + verbose=True + ) + + # Executa a crew e armazena o resultado no estado + result = research_crew.kickoff() + self.state.results = result.raw + + return "Research completed" + + @listen(execute_research) + def summarize_results(self, _): + # Acessa os resultados armazenados + result_length = len(self.state.results) + return f"Research on {self.state.topic} completed with {result_length} characters of results." ``` ### Manipulando Saídas de Crews no Estado @@ -246,7 +580,21 @@ Você pode usar o estado do flow para parametrizar crews: Quando um crew finaliza, é possível processar sua saída e armazená-la no estado do flow: ```python -# código não traduzido +@listen(execute_crew) +def process_crew_results(self, _): + # Faz parsing dos resultados brutos (assumindo saída em JSON) + import json + try: + results_dict = json.loads(self.state.raw_results) + self.state.processed_results = { + "title": results_dict.get("title", ""), + "main_points": results_dict.get("main_points", []), + "conclusion": results_dict.get("conclusion", "") + } + return "Results processed successfully" + except json.JSONDecodeError: + self.state.error = "Failed to parse crew results as JSON" + return "Error processing results" ``` ## Boas Práticas para Gerenciamento de Estado @@ -256,7 +604,19 @@ Quando um crew finaliza, é possível processar sua saída e armazená-la no est Projete seu estado para conter somente o necessário: ```python -# Exemplo não traduzido +# Abrangente demais +class BloatedState(BaseModel): + user_data: Dict = {} + system_settings: Dict = {} + temporary_calculations: List = [] + debug_info: Dict = {} + # ...muitos outros campos + +# Melhor: estado focado +class FocusedState(BaseModel): + user_id: str + preferences: Dict[str, str] + completion_status: Dict[str, bool] ``` ### 2. Use Estado Estruturado em Flows Complexos @@ -264,7 +624,23 @@ Projete seu estado para conter somente o necessário: À medida que seus flows evoluem em complexidade, o estado estruturado se torna cada vez mais valioso: ```python -# Exemplo não traduzido +# Flow simples pode usar estado não estruturado +class SimpleGreetingFlow(Flow): + @start() + def greet(self): + self.state["name"] = "World" + return f"Hello, {self.state['name']}!" + +# Flow complexo se beneficia de estado estruturado +class UserRegistrationState(BaseModel): + username: str + email: str + verification_status: bool = False + registration_date: datetime = Field(default_factory=datetime.now) + last_login: Optional[datetime] = None + +class RegistrationFlow(Flow[UserRegistrationState]): + # Métodos com acesso ao estado fortemente tipado ``` ### 3. Documente Transições de Estado @@ -272,7 +648,18 @@ Projete seu estado para conter somente o necessário: Para flows complexos, documente como o estado muda ao longo da execução: ```python -# Exemplo não traduzido +@start() +def initialize_order(self): + """ + Initialize order state with empty values. + + State before: {} + State after: {order_id: str, items: [], status: 'new'} + """ + self.state.order_id = str(uuid.uuid4()) + self.state.items = [] + self.state.status = "new" + return "Order initialized" ``` ### 4. Trate Erros de Estado de Forma Elegante @@ -280,7 +667,18 @@ Para flows complexos, documente como o estado muda ao longo da execução: Implemente tratamento de erros ao acessar o estado: ```python -# Exemplo não traduzido +@listen(previous_step) +def process_data(self, _): + try: + # Tenta acessar um valor que pode não existir + user_preference = self.state.preferences.get("theme", "default") + except (AttributeError, KeyError): + # Trata o erro de forma elegante + self.state.errors = self.state.get("errors", []) + self.state.errors.append("Failed to access preferences") + user_preference = "default" + + return f"Used preference: {user_preference}" ``` ### 5. Use o Estado Para Acompanhar o Progresso @@ -288,7 +686,30 @@ Implemente tratamento de erros ao acessar o estado: Aproveite o estado para monitorar o progresso em flows de longa duração: ```python -# Exemplo não traduzido +class ProgressTrackingFlow(Flow): + @start() + def initialize(self): + self.state["total_steps"] = 3 + self.state["current_step"] = 0 + self.state["progress"] = 0.0 + self.update_progress() + return "Initialized" + + def update_progress(self): + """Helper method to calculate and update progress""" + if self.state.get("total_steps", 0) > 0: + self.state["progress"] = (self.state.get("current_step", 0) / + self.state["total_steps"]) * 100 + print(f"Progress: {self.state['progress']:.1f}%") + + @listen(initialize) + def step_one(self, _): + # Realiza o trabalho... + self.state["current_step"] = 1 + self.update_progress() + return "Step 1 complete" + + # Etapas adicionais... ``` ### 6. Prefira Operações Imutáveis Quando Possível @@ -296,7 +717,22 @@ Aproveite o estado para monitorar o progresso em flows de longa duração: Especialmente com estado estruturado, prefira operações imutáveis para maior clareza: ```python -# Exemplo não traduzido +# Em vez de modificar listas no local: +self.state.items.append(new_item) # Operação mutável + +# Considere criar um novo estado: +from pydantic import BaseModel +from typing import List + +class ItemState(BaseModel): + items: List[str] = [] + +class ImmutableFlow(Flow[ItemState]): + @start() + def add_item(self): + # Cria uma nova lista com o item adicionado + self.state.items = [*self.state.items, "new item"] + return "Item added" ``` ## Depurando o Estado do Flow @@ -306,7 +742,24 @@ Especialmente com estado estruturado, prefira operações imutáveis para maior Ao desenvolver, adicione logs para acompanhar mudanças no estado: ```python -# Exemplo não traduzido +import logging +logging.basicConfig(level=logging.INFO) + +class LoggingFlow(Flow): + def log_state(self, step_name): + logging.info(f"State after {step_name}: {self.state}") + + @start() + def initialize(self): + self.state["counter"] = 0 + self.log_state("initialize") + return "Initialized" + + @listen(initialize) + def increment(self, _): + self.state["counter"] += 1 + self.log_state("increment") + return f"Incremented to {self.state['counter']}" ``` ### Visualizando o Estado @@ -314,7 +767,30 @@ Ao desenvolver, adicione logs para acompanhar mudanças no estado: Você pode adicionar métodos para visualizar seu estado durante o debug: ```python -# Exemplo não traduzido +def visualize_state(self): + """Create a simple visualization of the current state""" + import json + from rich.console import Console + from rich.panel import Panel + + console = Console() + + if hasattr(self.state, "model_dump"): + # Pydantic v2 + state_dict = self.state.model_dump() + elif hasattr(self.state, "dict"): + # Pydantic v1 + state_dict = self.state.dict() + else: + # Estado não estruturado + state_dict = dict(self.state) + + # Remove o id para uma saída mais limpa + if "id" in state_dict: + state_dict.pop("id") + + state_json = json.dumps(state_dict, indent=2, default=str) + console.print(Panel(state_json, title="Current Flow State")) ``` ## Conclusão