feat: add native async crew support

This commit is contained in:
Greyson Lalonde
2025-12-02 16:47:53 -05:00
parent bf9ccd418a
commit c57e325482
2 changed files with 720 additions and 0 deletions

View File

@@ -948,6 +948,342 @@ class Crew(FlowTrackable, BaseModel):
self._task_output_handler.reset()
return list(results)
async def akickoff(
self, inputs: dict[str, Any] | None = None
) -> CrewOutput | CrewStreamingOutput:
"""Native async kickoff method using async task execution throughout.
Unlike kickoff_async which wraps sync kickoff in a thread, this method
uses native async/await for all operations including task execution,
memory operations, and knowledge queries.
"""
if self.stream:
for agent in self.agents:
if agent.llm is not None:
agent.llm.stream = True
result_holder: list[CrewOutput] = []
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}
state = create_streaming_state(
current_task_info, result_holder, use_async=True
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
async def run_crew() -> None:
try:
self.stream = False
result = await self.akickoff(inputs)
if isinstance(result, CrewOutput):
result_holder.append(result)
except Exception as e:
signal_error(state, e, is_async=True)
finally:
self.stream = True
signal_end(state, is_async=True)
streaming_output = CrewStreamingOutput(
async_iterator=create_async_chunk_generator(
state, run_crew, output_holder
)
)
output_holder.append(streaming_output)
return streaming_output
ctx = baggage.set_baggage(
"crew_context", CrewContext(id=str(self.id), key=self.key)
)
token = attach(ctx)
try:
for before_callback in self.before_kickoff_callbacks:
if inputs is None:
inputs = {}
inputs = before_callback(inputs)
crewai_event_bus.emit(
self,
CrewKickoffStartedEvent(crew_name=self.name, inputs=inputs),
)
self._task_output_handler.reset()
self._logging_color = "bold_purple"
if inputs is not None:
self._inputs = inputs
self._interpolate_inputs(inputs)
self._set_tasks_callbacks()
self._set_allow_crewai_trigger_context_for_first_task()
for agent in self.agents:
agent.crew = self
agent.set_knowledge(crew_embedder=self.embedder)
if not agent.function_calling_llm: # type: ignore[attr-defined]
agent.function_calling_llm = self.function_calling_llm # type: ignore[attr-defined]
if not agent.step_callback: # type: ignore[attr-defined]
agent.step_callback = self.step_callback # type: ignore[attr-defined]
agent.create_agent_executor()
if self.planning:
self._handle_crew_planning()
if self.process == Process.sequential:
result = await self._arun_sequential_process()
elif self.process == Process.hierarchical:
result = await self._arun_hierarchical_process()
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
self.usage_metrics = self.calculate_usage_metrics()
return result
except Exception as e:
crewai_event_bus.emit(
self,
CrewKickoffFailedEvent(error=str(e), crew_name=self.name),
)
raise
finally:
detach(token)
async def akickoff_for_each(
self, inputs: list[dict[str, Any]]
) -> list[CrewOutput | CrewStreamingOutput] | CrewStreamingOutput:
"""Native async execution of the Crew's workflow for each input.
Uses native async throughout rather than thread-based async.
If stream=True, returns a single CrewStreamingOutput that yields chunks
from all crews as they arrive.
"""
crew_copies = [self.copy() for _ in inputs]
if self.stream:
result_holder: list[list[CrewOutput]] = [[]]
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}
state = create_streaming_state(
current_task_info, result_holder, use_async=True
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
async def run_all_crews() -> None:
try:
streaming_outputs: list[CrewStreamingOutput] = []
for i, crew in enumerate(crew_copies):
streaming = await crew.akickoff(inputs=inputs[i])
if isinstance(streaming, CrewStreamingOutput):
streaming_outputs.append(streaming)
async def consume_stream(
stream_output: CrewStreamingOutput,
) -> CrewOutput:
async for chunk in stream_output:
if state.async_queue is not None and state.loop is not None:
state.loop.call_soon_threadsafe(
state.async_queue.put_nowait, chunk
)
return stream_output.result
crew_results = await asyncio.gather(
*[consume_stream(s) for s in streaming_outputs]
)
result_holder[0] = list(crew_results)
except Exception as e:
signal_error(state, e, is_async=True)
finally:
signal_end(state, is_async=True)
streaming_output = CrewStreamingOutput(
async_iterator=create_async_chunk_generator(
state, run_all_crews, output_holder
)
)
def set_results_wrapper(result: Any) -> None:
streaming_output._set_results(result)
streaming_output._set_result = set_results_wrapper # type: ignore[method-assign]
output_holder.append(streaming_output)
return streaming_output
tasks = [
asyncio.create_task(crew_copy.akickoff(inputs=input_data))
for crew_copy, input_data in zip(crew_copies, inputs, strict=True)
]
results = await asyncio.gather(*tasks)
total_usage_metrics = UsageMetrics()
for crew_copy in crew_copies:
if crew_copy.usage_metrics:
total_usage_metrics.add_usage_metrics(crew_copy.usage_metrics)
self.usage_metrics = total_usage_metrics
self._task_output_handler.reset()
return list(results)
async def _arun_sequential_process(self) -> CrewOutput:
"""Executes tasks sequentially using native async and returns the final output."""
return await self._aexecute_tasks(self.tasks)
async def _arun_hierarchical_process(self) -> CrewOutput:
"""Creates and assigns a manager agent to complete the tasks using native async."""
self._create_manager_agent()
return await self._aexecute_tasks(self.tasks)
async def _aexecute_tasks(
self,
tasks: list[Task],
start_index: int | None = 0,
was_replayed: bool = False,
) -> CrewOutput:
"""Executes tasks using native async and returns the final output.
Args:
tasks: List of tasks to execute
start_index: Index to start execution from (for replay)
was_replayed: Whether this is a replayed execution
Returns:
CrewOutput: Final output of the crew
"""
task_outputs: list[TaskOutput] = []
pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]] = []
last_sync_output: TaskOutput | None = None
for task_index, task in enumerate(tasks):
if start_index is not None and task_index < start_index:
if task.output:
if task.async_execution:
task_outputs.append(task.output)
else:
task_outputs = [task.output]
last_sync_output = task.output
continue
agent_to_use = self._get_agent_to_use(task)
if agent_to_use is None:
raise ValueError(
f"No agent available for task: {task.description}. "
f"Ensure that either the task has an assigned agent "
f"or a manager agent is provided."
)
tools_for_task = task.tools or agent_to_use.tools or []
tools_for_task = self._prepare_tools(
agent_to_use,
task,
tools_for_task,
)
self._log_task_start(task, agent_to_use.role)
if isinstance(task, ConditionalTask):
skipped_task_output = await self._ahandle_conditional_task(
task, task_outputs, pending_tasks, task_index, was_replayed
)
if skipped_task_output:
task_outputs.append(skipped_task_output)
continue
if task.async_execution:
context = self._get_context(
task, [last_sync_output] if last_sync_output else []
)
async_task = asyncio.create_task(
task.aexecute_sync(
agent=agent_to_use,
context=context,
tools=tools_for_task,
)
)
pending_tasks.append((task, async_task, task_index))
else:
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(
pending_tasks, was_replayed
)
pending_tasks.clear()
context = self._get_context(task, task_outputs)
task_output = await task.aexecute_sync(
agent=agent_to_use,
context=context,
tools=tools_for_task,
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
return self._create_crew_output(task_outputs)
async def _ahandle_conditional_task(
self,
task: ConditionalTask,
task_outputs: list[TaskOutput],
pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]],
task_index: int,
was_replayed: bool,
) -> TaskOutput | None:
"""Handle conditional task evaluation using native async."""
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
pending_tasks.clear()
previous_output = task_outputs[-1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",
f"Skipping conditional task: {task.description}",
color="yellow",
)
skipped_task_output = task.get_skipped_task_output()
if not was_replayed:
self._store_execution_log(task, skipped_task_output, task_index)
return skipped_task_output
return None
async def _aprocess_async_tasks(
self,
pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]],
was_replayed: bool = False,
) -> list[TaskOutput]:
"""Process pending async tasks and return their outputs."""
task_outputs: list[TaskOutput] = []
for future_task, async_task, task_index in pending_tasks:
task_output = await async_task
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
self._store_execution_log(
future_task, task_output, task_index, was_replayed
)
return task_outputs
def _handle_crew_planning(self) -> None:
"""Handles the Crew planning."""
self._logger.log("info", "Planning the crew execution")

View File

@@ -0,0 +1,384 @@
"""Tests for async crew execution."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
from crewai.crews.crew_output import CrewOutput
from crewai.tasks.task_output import TaskOutput
@pytest.fixture
def test_agent() -> Agent:
"""Create a test agent."""
return Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm="gpt-4o-mini",
verbose=False,
)
@pytest.fixture
def test_task(test_agent: Agent) -> Task:
"""Create a test task."""
return Task(
description="Test task description",
expected_output="Test expected output",
agent=test_agent,
)
@pytest.fixture
def test_crew(test_agent: Agent, test_task: Task) -> Crew:
"""Create a test crew."""
return Crew(
agents=[test_agent],
tasks=[test_task],
verbose=False,
)
class TestAsyncCrewKickoff:
"""Tests for async crew kickoff methods."""
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_akickoff_basic(
self, mock_execute: AsyncMock, test_crew: Crew
) -> None:
"""Test basic async crew kickoff."""
mock_output = TaskOutput(
description="Test task description",
raw="Task result",
agent="Test Agent",
)
mock_execute.return_value = mock_output
result = await test_crew.akickoff()
assert result is not None
assert isinstance(result, CrewOutput)
assert result.raw == "Task result"
mock_execute.assert_called_once()
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_akickoff_with_inputs(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test async crew kickoff with inputs."""
task = Task(
description="Test task for {topic}",
expected_output="Expected output for {topic}",
agent=test_agent,
)
crew = Crew(
agents=[test_agent],
tasks=[task],
verbose=False,
)
mock_output = TaskOutput(
description="Test task for AI",
raw="Task result about AI",
agent="Test Agent",
)
mock_execute.return_value = mock_output
result = await crew.akickoff(inputs={"topic": "AI"})
assert result is not None
assert isinstance(result, CrewOutput)
mock_execute.assert_called_once()
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_akickoff_multiple_tasks(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test async crew kickoff with multiple tasks."""
task1 = Task(
description="First task",
expected_output="First output",
agent=test_agent,
)
task2 = Task(
description="Second task",
expected_output="Second output",
agent=test_agent,
)
crew = Crew(
agents=[test_agent],
tasks=[task1, task2],
verbose=False,
)
mock_output1 = TaskOutput(
description="First task",
raw="First result",
agent="Test Agent",
)
mock_output2 = TaskOutput(
description="Second task",
raw="Second result",
agent="Test Agent",
)
mock_execute.side_effect = [mock_output1, mock_output2]
result = await crew.akickoff()
assert result is not None
assert isinstance(result, CrewOutput)
assert result.raw == "Second result"
assert mock_execute.call_count == 2
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_akickoff_handles_exception(
self, mock_execute: AsyncMock, test_crew: Crew
) -> None:
"""Test that async kickoff handles exceptions properly."""
mock_execute.side_effect = RuntimeError("Test error")
with pytest.raises(RuntimeError) as exc_info:
await test_crew.akickoff()
assert "Test error" in str(exc_info.value)
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_akickoff_calls_before_callbacks(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test that async kickoff calls before_kickoff_callbacks."""
callback_called = False
def before_callback(inputs: dict | None) -> dict:
nonlocal callback_called
callback_called = True
return inputs or {}
task = Task(
description="Test task",
expected_output="Test output",
agent=test_agent,
)
crew = Crew(
agents=[test_agent],
tasks=[task],
verbose=False,
before_kickoff_callbacks=[before_callback],
)
mock_output = TaskOutput(
description="Test task",
raw="Task result",
agent="Test Agent",
)
mock_execute.return_value = mock_output
await crew.akickoff()
assert callback_called
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_akickoff_calls_after_callbacks(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test that async kickoff calls after_kickoff_callbacks."""
callback_called = False
def after_callback(result: CrewOutput) -> CrewOutput:
nonlocal callback_called
callback_called = True
return result
task = Task(
description="Test task",
expected_output="Test output",
agent=test_agent,
)
crew = Crew(
agents=[test_agent],
tasks=[task],
verbose=False,
after_kickoff_callbacks=[after_callback],
)
mock_output = TaskOutput(
description="Test task",
raw="Task result",
agent="Test Agent",
)
mock_execute.return_value = mock_output
await crew.akickoff()
assert callback_called
class TestAsyncCrewKickoffForEach:
"""Tests for async crew kickoff_for_each methods."""
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_akickoff_for_each_basic(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test basic async kickoff_for_each."""
task = Task(
description="Test task for {topic}",
expected_output="Expected output",
agent=test_agent,
)
crew = Crew(
agents=[test_agent],
tasks=[task],
verbose=False,
)
mock_output1 = TaskOutput(
description="Test task for AI",
raw="Result about AI",
agent="Test Agent",
)
mock_output2 = TaskOutput(
description="Test task for ML",
raw="Result about ML",
agent="Test Agent",
)
mock_execute.side_effect = [mock_output1, mock_output2]
inputs = [{"topic": "AI"}, {"topic": "ML"}]
results = await crew.akickoff_for_each(inputs)
assert len(results) == 2
assert all(isinstance(r, CrewOutput) for r in results)
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_akickoff_for_each_concurrent(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test that async kickoff_for_each runs concurrently."""
task = Task(
description="Test task for {topic}",
expected_output="Expected output",
agent=test_agent,
)
crew = Crew(
agents=[test_agent],
tasks=[task],
verbose=False,
)
mock_output = TaskOutput(
description="Test task",
raw="Result",
agent="Test Agent",
)
mock_execute.return_value = mock_output
inputs = [{"topic": f"topic_{i}"} for i in range(3)]
results = await crew.akickoff_for_each(inputs)
assert len(results) == 3
class TestAsyncTaskExecution:
"""Tests for async task execution within crew."""
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_aexecute_tasks_sequential(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test async sequential task execution."""
task1 = Task(
description="First task",
expected_output="First output",
agent=test_agent,
)
task2 = Task(
description="Second task",
expected_output="Second output",
agent=test_agent,
)
crew = Crew(
agents=[test_agent],
tasks=[task1, task2],
verbose=False,
)
mock_output1 = TaskOutput(
description="First task",
raw="First result",
agent="Test Agent",
)
mock_output2 = TaskOutput(
description="Second task",
raw="Second result",
agent="Test Agent",
)
mock_execute.side_effect = [mock_output1, mock_output2]
result = await crew._aexecute_tasks(crew.tasks)
assert result is not None
assert result.raw == "Second result"
assert len(result.tasks_output) == 2
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_aexecute_tasks_with_async_task(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test async execution with async_execution task flag."""
task1 = Task(
description="Async task",
expected_output="Async output",
agent=test_agent,
async_execution=True,
)
task2 = Task(
description="Sync task",
expected_output="Sync output",
agent=test_agent,
)
crew = Crew(
agents=[test_agent],
tasks=[task1, task2],
verbose=False,
)
mock_output1 = TaskOutput(
description="Async task",
raw="Async result",
agent="Test Agent",
)
mock_output2 = TaskOutput(
description="Sync task",
raw="Sync result",
agent="Test Agent",
)
mock_execute.side_effect = [mock_output1, mock_output2]
result = await crew._aexecute_tasks(crew.tasks)
assert result is not None
assert mock_execute.call_count == 2
class TestAsyncProcessAsyncTasks:
"""Tests for _aprocess_async_tasks method."""
@pytest.mark.asyncio
async def test_aprocess_async_tasks_empty(self, test_crew: Crew) -> None:
"""Test processing empty list of async tasks."""
result = await test_crew._aprocess_async_tasks([])
assert result == []