mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-15 11:58:31 +00:00
356 lines
11 KiB
Plaintext
356 lines
11 KiB
Plaintext
---
|
|
title: 스트리밍 Crew 실행
|
|
description: CrewAI crew 실행에서 실시간 출력을 스트리밍하기
|
|
icon: wave-pulse
|
|
mode: "wide"
|
|
---
|
|
|
|
## 소개
|
|
|
|
CrewAI는 crew 실행 중 실시간 출력을 스트리밍하는 기능을 제공하여, 전체 프로세스가 완료될 때까지 기다리지 않고 결과가 생성되는 대로 표시할 수 있습니다. 이 기능은 대화형 애플리케이션을 구축하거나, 사용자 피드백을 제공하거나, 장시간 실행되는 프로세스를 모니터링할 때 특히 유용합니다.
|
|
|
|
## 스트리밍 작동 방식
|
|
|
|
스트리밍이 활성화되면 CrewAI는 LLM 응답과 도구 호출을 실시간으로 캡처하여, 어떤 task와 agent가 실행 중인지에 대한 컨텍스트를 포함한 구조화된 청크로 패키징합니다. 이러한 청크를 실시간으로 반복 처리하고 실행이 완료되면 최종 결과에 접근할 수 있습니다.
|
|
|
|
## 스트리밍 활성화
|
|
|
|
스트리밍을 활성화하려면 crew를 생성할 때 `stream` 파라미터를 `True`로 설정하세요:
|
|
|
|
```python Code
|
|
from crewai import Agent, Crew, Task
|
|
|
|
# 에이전트와 태스크 생성
|
|
researcher = Agent(
|
|
role="Research Analyst",
|
|
goal="Gather comprehensive information on topics",
|
|
backstory="You are an experienced researcher with excellent analytical skills.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Research the latest developments in AI",
|
|
expected_output="A detailed report on recent AI advancements",
|
|
agent=researcher,
|
|
)
|
|
|
|
# 스트리밍 활성화
|
|
crew = Crew(
|
|
agents=[researcher],
|
|
tasks=[task],
|
|
stream=True # 스트리밍 출력 활성화
|
|
)
|
|
```
|
|
|
|
## 동기 스트리밍
|
|
|
|
스트리밍이 활성화된 crew에서 `kickoff()`를 호출하면, 청크가 도착할 때마다 반복 처리할 수 있는 `CrewStreamingOutput` 객체가 반환됩니다:
|
|
|
|
```python Code
|
|
# 스트리밍 실행 시작
|
|
streaming = crew.kickoff(inputs={"topic": "artificial intelligence"})
|
|
|
|
# 청크가 도착할 때마다 반복
|
|
for chunk in streaming:
|
|
print(chunk.content, end="", flush=True)
|
|
|
|
# 스트리밍 완료 후 최종 결과 접근
|
|
result = streaming.result
|
|
print(f"\n\n최종 출력: {result.raw}")
|
|
```
|
|
|
|
### 스트림 청크 정보
|
|
|
|
각 청크는 실행에 대한 풍부한 컨텍스트를 제공합니다:
|
|
|
|
```python Code
|
|
streaming = crew.kickoff(inputs={"topic": "AI"})
|
|
|
|
for chunk in streaming:
|
|
print(f"Task: {chunk.task_name} (인덱스 {chunk.task_index})")
|
|
print(f"Agent: {chunk.agent_role}")
|
|
print(f"Content: {chunk.content}")
|
|
print(f"Type: {chunk.chunk_type}") # TEXT 또는 TOOL_CALL
|
|
if chunk.tool_call:
|
|
print(f"Tool: {chunk.tool_call.tool_name}")
|
|
print(f"Arguments: {chunk.tool_call.arguments}")
|
|
```
|
|
|
|
### 스트리밍 결과 접근
|
|
|
|
`CrewStreamingOutput` 객체는 여러 유용한 속성을 제공합니다:
|
|
|
|
```python Code
|
|
streaming = crew.kickoff(inputs={"topic": "AI"})
|
|
|
|
# 청크 반복 및 수집
|
|
for chunk in streaming:
|
|
print(chunk.content, end="", flush=True)
|
|
|
|
# 반복 완료 후
|
|
print(f"\n완료됨: {streaming.is_completed}")
|
|
print(f"전체 텍스트: {streaming.get_full_text()}")
|
|
print(f"전체 청크 수: {len(streaming.chunks)}")
|
|
print(f"최종 결과: {streaming.result.raw}")
|
|
```
|
|
|
|
## 비동기 스트리밍
|
|
|
|
비동기 애플리케이션의 경우, 비동기 반복과 함께 `akickoff()`(네이티브 async) 또는 `kickoff_async()`(스레드 기반)를 사용할 수 있습니다:
|
|
|
|
### `akickoff()`를 사용한 네이티브 Async
|
|
|
|
`akickoff()` 메서드는 전체 체인에서 진정한 네이티브 async 실행을 제공합니다:
|
|
|
|
```python Code
|
|
import asyncio
|
|
|
|
async def stream_crew():
|
|
crew = Crew(
|
|
agents=[researcher],
|
|
tasks=[task],
|
|
stream=True
|
|
)
|
|
|
|
# 네이티브 async 스트리밍 시작
|
|
streaming = await crew.akickoff(inputs={"topic": "AI"})
|
|
|
|
# 청크에 대한 비동기 반복
|
|
async for chunk in streaming:
|
|
print(chunk.content, end="", flush=True)
|
|
|
|
# 최종 결과 접근
|
|
result = streaming.result
|
|
print(f"\n\n최종 출력: {result.raw}")
|
|
|
|
asyncio.run(stream_crew())
|
|
```
|
|
|
|
### `kickoff_async()`를 사용한 스레드 기반 Async
|
|
|
|
더 간단한 async 통합이나 하위 호환성을 위해:
|
|
|
|
```python Code
|
|
import asyncio
|
|
|
|
async def stream_crew():
|
|
crew = Crew(
|
|
agents=[researcher],
|
|
tasks=[task],
|
|
stream=True
|
|
)
|
|
|
|
# 스레드 기반 async 스트리밍 시작
|
|
streaming = await crew.kickoff_async(inputs={"topic": "AI"})
|
|
|
|
# 청크에 대한 비동기 반복
|
|
async for chunk in streaming:
|
|
print(chunk.content, end="", flush=True)
|
|
|
|
# 최종 결과 접근
|
|
result = streaming.result
|
|
print(f"\n\n최종 출력: {result.raw}")
|
|
|
|
asyncio.run(stream_crew())
|
|
```
|
|
|
|
<Note>
|
|
고동시성 워크로드의 경우, 태스크 실행, 메모리 작업, 지식 검색에 네이티브 async를 사용하는 `akickoff()`가 권장됩니다. 자세한 내용은 [Crew 비동기 시작](/ko/learn/kickoff-async) 가이드를 참조하세요.
|
|
</Note>
|
|
|
|
## kickoff_for_each를 사용한 스트리밍
|
|
|
|
`kickoff_for_each()`로 여러 입력에 대해 crew를 실행할 때, 동기 또는 비동기 여부에 따라 스트리밍이 다르게 작동합니다:
|
|
|
|
### 동기 kickoff_for_each
|
|
|
|
동기 `kickoff_for_each()`를 사용하면, 각 입력에 대해 하나씩 `CrewStreamingOutput` 객체의 리스트가 반환됩니다:
|
|
|
|
```python Code
|
|
crew = Crew(
|
|
agents=[researcher],
|
|
tasks=[task],
|
|
stream=True
|
|
)
|
|
|
|
inputs_list = [
|
|
{"topic": "AI in healthcare"},
|
|
{"topic": "AI in finance"}
|
|
]
|
|
|
|
# 스트리밍 출력 리스트 반환
|
|
streaming_outputs = crew.kickoff_for_each(inputs=inputs_list)
|
|
|
|
# 각 스트리밍 출력에 대해 반복
|
|
for i, streaming in enumerate(streaming_outputs):
|
|
print(f"\n=== 입력 {i + 1} ===")
|
|
for chunk in streaming:
|
|
print(chunk.content, end="", flush=True)
|
|
|
|
result = streaming.result
|
|
print(f"\n\n결과 {i + 1}: {result.raw}")
|
|
```
|
|
|
|
### 비동기 kickoff_for_each_async
|
|
|
|
비동기 `kickoff_for_each_async()`를 사용하면, 모든 crew의 청크가 동시에 도착하는 대로 반환하는 단일 `CrewStreamingOutput`이 반환됩니다:
|
|
|
|
```python Code
|
|
import asyncio
|
|
|
|
async def stream_multiple_crews():
|
|
crew = Crew(
|
|
agents=[researcher],
|
|
tasks=[task],
|
|
stream=True
|
|
)
|
|
|
|
inputs_list = [
|
|
{"topic": "AI in healthcare"},
|
|
{"topic": "AI in finance"}
|
|
]
|
|
|
|
# 모든 crew에 대한 단일 스트리밍 출력 반환
|
|
streaming = await crew.kickoff_for_each_async(inputs=inputs_list)
|
|
|
|
# 모든 crew의 청크가 생성되는 대로 도착
|
|
async for chunk in streaming:
|
|
print(f"[{chunk.task_name}] {chunk.content}", end="", flush=True)
|
|
|
|
# 모든 결과 접근
|
|
results = streaming.results # CrewOutput 객체 리스트
|
|
for i, result in enumerate(results):
|
|
print(f"\n\n결과 {i + 1}: {result.raw}")
|
|
|
|
asyncio.run(stream_multiple_crews())
|
|
```
|
|
|
|
## 스트림 청크 타입
|
|
|
|
청크는 `chunk_type` 필드로 표시되는 다양한 타입을 가질 수 있습니다:
|
|
|
|
### TEXT 청크
|
|
|
|
LLM 응답의 표준 텍스트 콘텐츠:
|
|
|
|
```python Code
|
|
for chunk in streaming:
|
|
if chunk.chunk_type == StreamChunkType.TEXT:
|
|
print(chunk.content, end="", flush=True)
|
|
```
|
|
|
|
### TOOL_CALL 청크
|
|
|
|
수행 중인 도구 호출에 대한 정보:
|
|
|
|
```python Code
|
|
for chunk in streaming:
|
|
if chunk.chunk_type == StreamChunkType.TOOL_CALL:
|
|
print(f"\n도구 호출: {chunk.tool_call.tool_name}")
|
|
print(f"인자: {chunk.tool_call.arguments}")
|
|
```
|
|
|
|
## 실용적인 예시: 스트리밍을 사용한 UI 구축
|
|
|
|
다음은 스트리밍을 사용한 대화형 애플리케이션을 구축하는 방법을 보여주는 완전한 예시입니다:
|
|
|
|
```python Code
|
|
import asyncio
|
|
from crewai import Agent, Crew, Task
|
|
from crewai.types.streaming import StreamChunkType
|
|
|
|
async def interactive_research():
|
|
# 스트리밍이 활성화된 crew 생성
|
|
researcher = Agent(
|
|
role="Research Analyst",
|
|
goal="Provide detailed analysis on any topic",
|
|
backstory="You are an expert researcher with broad knowledge.",
|
|
)
|
|
|
|
task = Task(
|
|
description="Research and analyze: {topic}",
|
|
expected_output="A comprehensive analysis with key insights",
|
|
agent=researcher,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[researcher],
|
|
tasks=[task],
|
|
stream=True,
|
|
verbose=False
|
|
)
|
|
|
|
# 사용자 입력 받기
|
|
topic = input("연구할 주제를 입력하세요: ")
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"연구 중: {topic}")
|
|
print(f"{'='*60}\n")
|
|
|
|
# 스트리밍 실행 시작
|
|
streaming = await crew.kickoff_async(inputs={"topic": topic})
|
|
|
|
current_task = ""
|
|
async for chunk in streaming:
|
|
# 태스크 전환 표시
|
|
if chunk.task_name != current_task:
|
|
current_task = chunk.task_name
|
|
print(f"\n[{chunk.agent_role}] 작업 중: {chunk.task_name}")
|
|
print("-" * 60)
|
|
|
|
# 텍스트 청크 표시
|
|
if chunk.chunk_type == StreamChunkType.TEXT:
|
|
print(chunk.content, end="", flush=True)
|
|
|
|
# 도구 호출 표시
|
|
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
|
|
print(f"\n🔧 도구 사용: {chunk.tool_call.tool_name}")
|
|
|
|
# 최종 결과 표시
|
|
result = streaming.result
|
|
print(f"\n\n{'='*60}")
|
|
print("분석 완료!")
|
|
print(f"{'='*60}")
|
|
print(f"\n토큰 사용량: {result.token_usage}")
|
|
|
|
asyncio.run(interactive_research())
|
|
```
|
|
|
|
## 사용 사례
|
|
|
|
스트리밍은 다음과 같은 경우에 특히 유용합니다:
|
|
|
|
- **대화형 애플리케이션**: 에이전트가 작업하는 동안 사용자에게 실시간 피드백 제공
|
|
- **장시간 실행 태스크**: 연구, 분석 또는 콘텐츠 생성의 진행 상황 표시
|
|
- **디버깅 및 모니터링**: 에이전트 동작과 의사 결정을 실시간으로 관찰
|
|
- **사용자 경험**: 점진적인 결과를 표시하여 체감 지연 시간 감소
|
|
- **라이브 대시보드**: crew 실행 상태를 표시하는 모니터링 인터페이스 구축
|
|
|
|
## 중요 사항
|
|
|
|
- 스트리밍은 crew의 모든 에이전트에 대해 자동으로 LLM 스트리밍을 활성화합니다
|
|
- `.result` 속성에 접근하기 전에 모든 청크를 반복해야 합니다
|
|
- 스트리밍을 사용하는 `kickoff_for_each_async()`의 경우, 모든 출력을 가져오려면 `.results`(복수형)를 사용하세요
|
|
- 스트리밍은 최소한의 오버헤드를 추가하며 실제로 체감 성능을 향상시킬 수 있습니다
|
|
- 각 청크는 풍부한 UI를 위한 전체 컨텍스트(태스크, 에이전트, 청크 타입)를 포함합니다
|
|
|
|
## 오류 처리
|
|
|
|
스트리밍 실행 중 오류 처리:
|
|
|
|
```python Code
|
|
streaming = crew.kickoff(inputs={"topic": "AI"})
|
|
|
|
try:
|
|
for chunk in streaming:
|
|
print(chunk.content, end="", flush=True)
|
|
|
|
result = streaming.result
|
|
print(f"\n성공: {result.raw}")
|
|
|
|
except Exception as e:
|
|
print(f"\n스트리밍 중 오류 발생: {e}")
|
|
if streaming.is_completed:
|
|
print("스트리밍은 완료되었지만 오류가 발생했습니다")
|
|
```
|
|
|
|
스트리밍을 활용하면 CrewAI로 더 반응성이 좋고 대화형인 애플리케이션을 구축하여 사용자에게 에이전트 실행과 결과에 대한 실시간 가시성을 제공할 수 있습니다. |