--- title: Crew 비동기 시작 description: Crew를 비동기로 시작하기 icon: rocket-launch mode: "wide" --- ## 소개 CrewAI는 crew를 비동기적으로 시작할 수 있는 기능을 제공합니다. 이를 통해 crew 실행을 블로킹(blocking) 없이 시작할 수 있습니다. 이 기능은 여러 개의 crew를 동시에 실행하거나 crew가 실행되는 동안 다른 작업을 수행해야 할 때 특히 유용합니다. CrewAI는 비동기 실행을 위해 두 가지 접근 방식을 제공합니다: | 메서드 | 타입 | 설명 | |--------|------|-------------| | `akickoff()` | 네이티브 async | 전체 실행 체인에서 진정한 async/await 사용 | | `kickoff_async()` | 스레드 기반 | 동기 실행을 `asyncio.to_thread`로 래핑 | 고동시성 워크로드의 경우 `akickoff()`가 권장됩니다. 이는 작업 실행, 메모리 작업, 지식 검색에 네이티브 async를 사용합니다. ## `akickoff()`를 사용한 네이티브 비동기 실행 `akickoff()` 메서드는 작업 실행, 메모리 작업, 지식 쿼리를 포함한 전체 실행 체인에서 async/await를 사용하여 진정한 네이티브 비동기 실행을 제공합니다. ### 메서드 시그니처 ```python Code async def akickoff(self, inputs: dict) -> CrewOutput: ``` ### 매개변수 - `inputs` (dict): 작업에 필요한 입력 데이터를 포함하는 딕셔너리입니다. ### 반환 - `CrewOutput`: crew 실행 결과를 나타내는 객체입니다. ### 예시: 네이티브 비동기 Crew 실행 ```python Code import asyncio from crewai import Crew, Agent, Task # 에이전트 생성 coding_agent = Agent( role="Python Data Analyst", goal="Analyze data and provide insights using Python", backstory="You are an experienced data analyst with strong Python skills.", allow_code_execution=True ) # 작업 생성 data_analysis_task = Task( description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}", agent=coding_agent, expected_output="The average age of the participants." ) # Crew 생성 analysis_crew = Crew( agents=[coding_agent], tasks=[data_analysis_task] ) # 네이티브 비동기 실행 async def main(): result = await analysis_crew.akickoff(inputs={"ages": [25, 30, 35, 40, 45]}) print("Crew Result:", result) asyncio.run(main()) ``` ### 예시: 여러 네이티브 비동기 Crew `asyncio.gather()`를 사용하여 네이티브 async로 여러 crew를 동시에 실행: ```python Code import asyncio from crewai import Crew, Agent, Task coding_agent = Agent( role="Python Data Analyst", goal="Analyze data and provide insights using Python", backstory="You are an experienced data analyst with strong Python skills.", allow_code_execution=True ) task_1 = Task( description="Analyze the first dataset and calculate the average age. Ages: {ages}", agent=coding_agent, expected_output="The average age of the participants." ) task_2 = Task( description="Analyze the second dataset and calculate the average age. Ages: {ages}", agent=coding_agent, expected_output="The average age of the participants." ) crew_1 = Crew(agents=[coding_agent], tasks=[task_1]) crew_2 = Crew(agents=[coding_agent], tasks=[task_2]) async def main(): results = await asyncio.gather( crew_1.akickoff(inputs={"ages": [25, 30, 35, 40, 45]}), crew_2.akickoff(inputs={"ages": [20, 22, 24, 28, 30]}) ) for i, result in enumerate(results, 1): print(f"Crew {i} Result:", result) asyncio.run(main()) ``` ### 예시: 여러 입력에 대한 네이티브 비동기 `akickoff_for_each()`를 사용하여 네이티브 async로 여러 입력에 대해 crew를 동시에 실행: ```python Code import asyncio from crewai import Crew, Agent, Task coding_agent = Agent( role="Python Data Analyst", goal="Analyze data and provide insights using Python", backstory="You are an experienced data analyst with strong Python skills.", allow_code_execution=True ) data_analysis_task = Task( description="Analyze the dataset and calculate the average age. Ages: {ages}", agent=coding_agent, expected_output="The average age of the participants." ) analysis_crew = Crew( agents=[coding_agent], tasks=[data_analysis_task] ) async def main(): datasets = [ {"ages": [25, 30, 35, 40, 45]}, {"ages": [20, 22, 24, 28, 30]}, {"ages": [30, 35, 40, 45, 50]} ] results = await analysis_crew.akickoff_for_each(datasets) for i, result in enumerate(results, 1): print(f"Dataset {i} Result:", result) asyncio.run(main()) ``` ## `kickoff_async()`를 사용한 스레드 기반 비동기 `kickoff_async()` 메서드는 동기 `kickoff()`를 스레드로 래핑하여 비동기 실행을 제공합니다. 이는 더 간단한 비동기 통합이나 하위 호환성에 유용합니다. ### 메서드 시그니처 ```python Code async def kickoff_async(self, inputs: dict) -> CrewOutput: ``` ### 매개변수 - `inputs` (dict): 작업에 필요한 입력 데이터를 포함하는 딕셔너리입니다. ### 반환 - `CrewOutput`: crew 실행 결과를 나타내는 객체입니다. ### 예시: 스레드 기반 비동기 실행 ```python Code import asyncio from crewai import Crew, Agent, Task coding_agent = Agent( role="Python Data Analyst", goal="Analyze data and provide insights using Python", backstory="You are an experienced data analyst with strong Python skills.", allow_code_execution=True ) data_analysis_task = Task( description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}", agent=coding_agent, expected_output="The average age of the participants." ) analysis_crew = Crew( agents=[coding_agent], tasks=[data_analysis_task] ) async def async_crew_execution(): result = await analysis_crew.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]}) print("Crew Result:", result) asyncio.run(async_crew_execution()) ``` ### 예시: 여러 스레드 기반 비동기 Crew ```python Code import asyncio from crewai import Crew, Agent, Task coding_agent = Agent( role="Python Data Analyst", goal="Analyze data and provide insights using Python", backstory="You are an experienced data analyst with strong Python skills.", allow_code_execution=True ) task_1 = Task( description="Analyze the first dataset and calculate the average age of participants. Ages: {ages}", agent=coding_agent, expected_output="The average age of the participants." ) task_2 = Task( description="Analyze the second dataset and calculate the average age of participants. Ages: {ages}", agent=coding_agent, expected_output="The average age of the participants." ) crew_1 = Crew(agents=[coding_agent], tasks=[task_1]) crew_2 = Crew(agents=[coding_agent], tasks=[task_2]) async def async_multiple_crews(): result_1 = crew_1.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]}) result_2 = crew_2.kickoff_async(inputs={"ages": [20, 22, 24, 28, 30]}) results = await asyncio.gather(result_1, result_2) for i, result in enumerate(results, 1): print(f"Crew {i} Result:", result) asyncio.run(async_multiple_crews()) ``` ## 비동기 스트리밍 두 비동기 메서드 모두 crew에 `stream=True`가 설정된 경우 스트리밍을 지원합니다: ```python Code import asyncio from crewai import Crew, Agent, Task agent = Agent( role="Researcher", goal="Research and summarize topics", backstory="You are an expert researcher." ) task = Task( description="Research the topic: {topic}", agent=agent, expected_output="A comprehensive summary of the topic." ) crew = Crew( agents=[agent], tasks=[task], stream=True # 스트리밍 활성화 ) async def main(): streaming_output = await crew.akickoff(inputs={"topic": "AI trends in 2024"}) # 스트리밍 청크에 대한 비동기 반복 async for chunk in streaming_output: print(f"Chunk: {chunk.content}") # 스트리밍 완료 후 최종 결과 접근 result = streaming_output.result print(f"Final result: {result.raw}") asyncio.run(main()) ``` ## 잠재적 사용 사례 - **병렬 콘텐츠 생성**: 여러 개의 독립적인 crew를 비동기적으로 시작하여, 각 crew가 다른 주제에 대한 콘텐츠 생성을 담당합니다. 예를 들어, 한 crew는 AI 트렌드에 대한 기사 조사 및 초안을 작성하는 반면, 또 다른 crew는 신제품 출시와 관련된 소셜 미디어 게시물을 생성할 수 있습니다. - **동시 시장 조사 작업**: 여러 crew를 비동기적으로 시작하여 시장 조사를 병렬로 수행합니다. 한 crew는 업계 동향을 분석하고, 또 다른 crew는 경쟁사 전략을 조사하며, 또 다른 crew는 소비자 감정을 평가할 수 있습니다. - **독립적인 여행 계획 모듈**: 각각 독립적으로 여행의 다양한 측면을 계획하도록 crew를 따로 실행합니다. 한 crew는 항공편 옵션을, 다른 crew는 숙박을, 세 번째 crew는 활동 계획을 담당할 수 있습니다. ## `akickoff()`와 `kickoff_async()` 선택하기 | 기능 | `akickoff()` | `kickoff_async()` | |---------|--------------|-------------------| | 실행 모델 | 네이티브 async/await | 스레드 기반 래퍼 | | 작업 실행 | `aexecute_sync()`로 비동기 | 스레드 풀에서 동기 | | 메모리 작업 | 비동기 | 스레드 풀에서 동기 | | 지식 검색 | 비동기 | 스레드 풀에서 동기 | | 적합한 용도 | 고동시성, I/O 바운드 워크로드 | 간단한 비동기 통합 | | 스트리밍 지원 | 예 | 예 |