--- title: Streaming na Execução da Crew description: Transmita saída em tempo real da execução da sua crew no CrewAI icon: wave-pulse mode: "wide" --- ## Introdução O CrewAI fornece a capacidade de transmitir saída em tempo real durante a execução da crew, permitindo que você exiba resultados conforme são gerados, em vez de esperar que todo o processo seja concluído. Este recurso é particularmente útil para construir aplicações interativas, fornecer feedback ao usuário e monitorar processos de longa duração. ## Como o Streaming Funciona Quando o streaming está ativado, o CrewAI captura respostas do LLM e chamadas de ferramentas conforme acontecem, empacotando-as em chunks estruturados que incluem contexto sobre qual task e agent está executando. Você pode iterar sobre esses chunks em tempo real e acessar o resultado final quando a execução for concluída. ## Ativando o Streaming Para ativar o streaming, defina o parâmetro `stream` como `True` ao criar sua crew: ```python Code from crewai import Agent, Crew, Task # Crie seus agentes e tasks researcher = Agent( role="Research Analyst", goal="Gather comprehensive information on topics", backstory="You are an experienced researcher with excellent analytical skills.", ) task = Task( description="Research the latest developments in AI", expected_output="A detailed report on recent AI advancements", agent=researcher, ) # Ativar streaming crew = Crew( agents=[researcher], tasks=[task], stream=True # Ativar saída em streaming ) ``` ## Streaming Síncrono Quando você chama `kickoff()` em uma crew com streaming ativado, ele retorna um objeto `CrewStreamingOutput` que você pode iterar para receber chunks conforme chegam: ```python Code # Iniciar execução com streaming streaming = crew.kickoff(inputs={"topic": "artificial intelligence"}) # Iterar sobre chunks conforme chegam for chunk in streaming: print(chunk.content, end="", flush=True) # Acessar o resultado final após o streaming completar result = streaming.result print(f"\n\nSaída final: {result.raw}") ``` ### Informações do Chunk de Stream Cada chunk fornece contexto rico sobre a execução: ```python Code streaming = crew.kickoff(inputs={"topic": "AI"}) for chunk in streaming: print(f"Task: {chunk.task_name} (índice {chunk.task_index})") print(f"Agent: {chunk.agent_role}") print(f"Content: {chunk.content}") print(f"Type: {chunk.chunk_type}") # TEXT ou TOOL_CALL if chunk.tool_call: print(f"Tool: {chunk.tool_call.tool_name}") print(f"Arguments: {chunk.tool_call.arguments}") ``` ### Acessando Resultados do Streaming O objeto `CrewStreamingOutput` fornece várias propriedades úteis: ```python Code streaming = crew.kickoff(inputs={"topic": "AI"}) # Iterar e coletar chunks for chunk in streaming: print(chunk.content, end="", flush=True) # Após a iteração completar print(f"\nCompletado: {streaming.is_completed}") print(f"Texto completo: {streaming.get_full_text()}") print(f"Todos os chunks: {len(streaming.chunks)}") print(f"Resultado final: {streaming.result.raw}") ``` ## Streaming Assíncrono Para aplicações assíncronas, você pode usar `akickoff()` (async nativo) ou `kickoff_async()` (baseado em threads) com iteração assíncrona: ### Async Nativo com `akickoff()` O método `akickoff()` fornece execução async nativa verdadeira em toda a cadeia: ```python Code import asyncio async def stream_crew(): crew = Crew( agents=[researcher], tasks=[task], stream=True ) # Iniciar streaming async nativo streaming = await crew.akickoff(inputs={"topic": "AI"}) # Iteração assíncrona sobre chunks async for chunk in streaming: print(chunk.content, end="", flush=True) # Acessar resultado final result = streaming.result print(f"\n\nSaída final: {result.raw}") asyncio.run(stream_crew()) ``` ### Async Baseado em Threads com `kickoff_async()` Para integração async mais simples ou compatibilidade retroativa: ```python Code import asyncio async def stream_crew(): crew = Crew( agents=[researcher], tasks=[task], stream=True ) # Iniciar streaming async baseado em threads streaming = await crew.kickoff_async(inputs={"topic": "AI"}) # Iteração assíncrona sobre chunks async for chunk in streaming: print(chunk.content, end="", flush=True) # Acessar resultado final result = streaming.result print(f"\n\nSaída final: {result.raw}") asyncio.run(stream_crew()) ``` Para cargas de trabalho de alta concorrência, `akickoff()` é recomendado pois usa async nativo para execução de tasks, operações de memória e recuperação de conhecimento. Consulte o guia [Iniciar Crew de Forma Assíncrona](/pt-BR/learn/kickoff-async) para mais detalhes. ## Streaming com kickoff_for_each Ao executar uma crew para múltiplas entradas com `kickoff_for_each()`, o streaming funciona de forma diferente dependendo se você usa síncrono ou assíncrono: ### kickoff_for_each Síncrono Com `kickoff_for_each()` síncrono, você obtém uma lista de objetos `CrewStreamingOutput`, um para cada entrada: ```python Code crew = Crew( agents=[researcher], tasks=[task], stream=True ) inputs_list = [ {"topic": "AI in healthcare"}, {"topic": "AI in finance"} ] # Retorna lista de saídas de streaming streaming_outputs = crew.kickoff_for_each(inputs=inputs_list) # Iterar sobre cada saída de streaming for i, streaming in enumerate(streaming_outputs): print(f"\n=== Entrada {i + 1} ===") for chunk in streaming: print(chunk.content, end="", flush=True) result = streaming.result print(f"\n\nResultado {i + 1}: {result.raw}") ``` ### kickoff_for_each_async Assíncrono Com `kickoff_for_each_async()` assíncrono, você obtém um único `CrewStreamingOutput` que produz chunks de todas as crews conforme chegam concorrentemente: ```python Code import asyncio async def stream_multiple_crews(): crew = Crew( agents=[researcher], tasks=[task], stream=True ) inputs_list = [ {"topic": "AI in healthcare"}, {"topic": "AI in finance"} ] # Retorna saída de streaming única para todas as crews streaming = await crew.kickoff_for_each_async(inputs=inputs_list) # Chunks de todas as crews chegam conforme são gerados async for chunk in streaming: print(f"[{chunk.task_name}] {chunk.content}", end="", flush=True) # Acessar todos os resultados results = streaming.results # Lista de objetos CrewOutput for i, result in enumerate(results): print(f"\n\nResultado {i + 1}: {result.raw}") asyncio.run(stream_multiple_crews()) ``` ## Tipos de Chunk de Stream Chunks podem ser de diferentes tipos, indicados pelo campo `chunk_type`: ### Chunks TEXT Conteúdo de texto padrão de respostas do LLM: ```python Code for chunk in streaming: if chunk.chunk_type == StreamChunkType.TEXT: print(chunk.content, end="", flush=True) ``` ### Chunks TOOL_CALL Informações sobre chamadas de ferramentas sendo feitas: ```python Code for chunk in streaming: if chunk.chunk_type == StreamChunkType.TOOL_CALL: print(f"\nChamando ferramenta: {chunk.tool_call.tool_name}") print(f"Argumentos: {chunk.tool_call.arguments}") ``` ## Exemplo Prático: Construindo uma UI com Streaming Aqui está um exemplo completo mostrando como construir uma aplicação interativa com streaming: ```python Code import asyncio from crewai import Agent, Crew, Task from crewai.types.streaming import StreamChunkType async def interactive_research(): # Criar crew com streaming ativado researcher = Agent( role="Research Analyst", goal="Provide detailed analysis on any topic", backstory="You are an expert researcher with broad knowledge.", ) task = Task( description="Research and analyze: {topic}", expected_output="A comprehensive analysis with key insights", agent=researcher, ) crew = Crew( agents=[researcher], tasks=[task], stream=True, verbose=False ) # Obter entrada do usuário topic = input("Digite um tópico para pesquisar: ") print(f"\n{'='*60}") print(f"Pesquisando: {topic}") print(f"{'='*60}\n") # Iniciar execução com streaming streaming = await crew.kickoff_async(inputs={"topic": topic}) current_task = "" async for chunk in streaming: # Mostrar transições de task if chunk.task_name != current_task: current_task = chunk.task_name print(f"\n[{chunk.agent_role}] Trabalhando em: {chunk.task_name}") print("-" * 60) # Exibir chunks de texto if chunk.chunk_type == StreamChunkType.TEXT: print(chunk.content, end="", flush=True) # Exibir chamadas de ferramentas elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call: print(f"\n🔧 Usando ferramenta: {chunk.tool_call.tool_name}") # Mostrar resultado final result = streaming.result print(f"\n\n{'='*60}") print("Análise Completa!") print(f"{'='*60}") print(f"\nUso de Tokens: {result.token_usage}") asyncio.run(interactive_research()) ``` ## Casos de Uso O streaming é particularmente valioso para: - **Aplicações Interativas**: Fornecer feedback em tempo real aos usuários enquanto os agentes trabalham - **Tasks de Longa Duração**: Mostrar progresso para pesquisa, análise ou geração de conteúdo - **Depuração e Monitoramento**: Observar comportamento e tomada de decisão dos agentes em tempo real - **Experiência do Usuário**: Reduzir latência percebida mostrando resultados incrementais - **Dashboards ao Vivo**: Construir interfaces de monitoramento que exibem status de execução da crew ## Notas Importantes - O streaming ativa automaticamente o streaming do LLM para todos os agentes na crew - Você deve iterar através de todos os chunks antes de acessar a propriedade `.result` - Para `kickoff_for_each_async()` com streaming, use `.results` (plural) para obter todas as saídas - O streaming adiciona overhead mínimo e pode realmente melhorar a performance percebida - Cada chunk inclui contexto completo (task, agente, tipo de chunk) para UIs ricas ## Tratamento de Erros Trate erros durante a execução com streaming: ```python Code streaming = crew.kickoff(inputs={"topic": "AI"}) try: for chunk in streaming: print(chunk.content, end="", flush=True) result = streaming.result print(f"\nSucesso: {result.raw}") except Exception as e: print(f"\nErro durante o streaming: {e}") if streaming.is_completed: print("O streaming foi completado mas ocorreu um erro") ``` Ao aproveitar o streaming, você pode construir aplicações mais responsivas e interativas com o CrewAI, fornecendo aos usuários visibilidade em tempo real da execução dos agentes e resultados.