From c57e325482056e26287b3053715bd881960b7bdf Mon Sep 17 00:00:00 2001 From: Greyson Lalonde Date: Tue, 2 Dec 2025 16:47:53 -0500 Subject: [PATCH] feat: add native async crew support --- lib/crewai/src/crewai/crew.py | 336 ++++++++++++++++++++ lib/crewai/tests/crew/test_async_crew.py | 384 +++++++++++++++++++++++ 2 files changed, 720 insertions(+) create mode 100644 lib/crewai/tests/crew/test_async_crew.py diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index bbdfd28da..094c21e76 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -948,6 +948,342 @@ class Crew(FlowTrackable, BaseModel): self._task_output_handler.reset() return list(results) + async def akickoff( + self, inputs: dict[str, Any] | None = None + ) -> CrewOutput | CrewStreamingOutput: + """Native async kickoff method using async task execution throughout. + + Unlike kickoff_async which wraps sync kickoff in a thread, this method + uses native async/await for all operations including task execution, + memory operations, and knowledge queries. + """ + if self.stream: + for agent in self.agents: + if agent.llm is not None: + agent.llm.stream = True + + result_holder: list[CrewOutput] = [] + current_task_info: TaskInfo = { + "index": 0, + "name": "", + "id": "", + "agent_role": "", + "agent_id": "", + } + + state = create_streaming_state( + current_task_info, result_holder, use_async=True + ) + output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = [] + + async def run_crew() -> None: + try: + self.stream = False + result = await self.akickoff(inputs) + if isinstance(result, CrewOutput): + result_holder.append(result) + except Exception as e: + signal_error(state, e, is_async=True) + finally: + self.stream = True + signal_end(state, is_async=True) + + streaming_output = CrewStreamingOutput( + async_iterator=create_async_chunk_generator( + state, run_crew, output_holder + ) + ) + output_holder.append(streaming_output) + + return streaming_output + + ctx = baggage.set_baggage( + "crew_context", CrewContext(id=str(self.id), key=self.key) + ) + token = attach(ctx) + + try: + for before_callback in self.before_kickoff_callbacks: + if inputs is None: + inputs = {} + inputs = before_callback(inputs) + + crewai_event_bus.emit( + self, + CrewKickoffStartedEvent(crew_name=self.name, inputs=inputs), + ) + + self._task_output_handler.reset() + self._logging_color = "bold_purple" + + if inputs is not None: + self._inputs = inputs + self._interpolate_inputs(inputs) + self._set_tasks_callbacks() + self._set_allow_crewai_trigger_context_for_first_task() + + for agent in self.agents: + agent.crew = self + agent.set_knowledge(crew_embedder=self.embedder) + if not agent.function_calling_llm: # type: ignore[attr-defined] + agent.function_calling_llm = self.function_calling_llm # type: ignore[attr-defined] + + if not agent.step_callback: # type: ignore[attr-defined] + agent.step_callback = self.step_callback # type: ignore[attr-defined] + + agent.create_agent_executor() + + if self.planning: + self._handle_crew_planning() + + if self.process == Process.sequential: + result = await self._arun_sequential_process() + elif self.process == Process.hierarchical: + result = await self._arun_hierarchical_process() + else: + raise NotImplementedError( + f"The process '{self.process}' is not implemented yet." + ) + + for after_callback in self.after_kickoff_callbacks: + result = after_callback(result) + + self.usage_metrics = self.calculate_usage_metrics() + + return result + except Exception as e: + crewai_event_bus.emit( + self, + CrewKickoffFailedEvent(error=str(e), crew_name=self.name), + ) + raise + finally: + detach(token) + + async def akickoff_for_each( + self, inputs: list[dict[str, Any]] + ) -> list[CrewOutput | CrewStreamingOutput] | CrewStreamingOutput: + """Native async execution of the Crew's workflow for each input. + + Uses native async throughout rather than thread-based async. + If stream=True, returns a single CrewStreamingOutput that yields chunks + from all crews as they arrive. + """ + crew_copies = [self.copy() for _ in inputs] + + if self.stream: + result_holder: list[list[CrewOutput]] = [[]] + current_task_info: TaskInfo = { + "index": 0, + "name": "", + "id": "", + "agent_role": "", + "agent_id": "", + } + + state = create_streaming_state( + current_task_info, result_holder, use_async=True + ) + output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = [] + + async def run_all_crews() -> None: + try: + streaming_outputs: list[CrewStreamingOutput] = [] + for i, crew in enumerate(crew_copies): + streaming = await crew.akickoff(inputs=inputs[i]) + if isinstance(streaming, CrewStreamingOutput): + streaming_outputs.append(streaming) + + async def consume_stream( + stream_output: CrewStreamingOutput, + ) -> CrewOutput: + async for chunk in stream_output: + if state.async_queue is not None and state.loop is not None: + state.loop.call_soon_threadsafe( + state.async_queue.put_nowait, chunk + ) + return stream_output.result + + crew_results = await asyncio.gather( + *[consume_stream(s) for s in streaming_outputs] + ) + result_holder[0] = list(crew_results) + except Exception as e: + signal_error(state, e, is_async=True) + finally: + signal_end(state, is_async=True) + + streaming_output = CrewStreamingOutput( + async_iterator=create_async_chunk_generator( + state, run_all_crews, output_holder + ) + ) + + def set_results_wrapper(result: Any) -> None: + streaming_output._set_results(result) + + streaming_output._set_result = set_results_wrapper # type: ignore[method-assign] + output_holder.append(streaming_output) + + return streaming_output + + tasks = [ + asyncio.create_task(crew_copy.akickoff(inputs=input_data)) + for crew_copy, input_data in zip(crew_copies, inputs, strict=True) + ] + + results = await asyncio.gather(*tasks) + + total_usage_metrics = UsageMetrics() + for crew_copy in crew_copies: + if crew_copy.usage_metrics: + total_usage_metrics.add_usage_metrics(crew_copy.usage_metrics) + self.usage_metrics = total_usage_metrics + + self._task_output_handler.reset() + return list(results) + + async def _arun_sequential_process(self) -> CrewOutput: + """Executes tasks sequentially using native async and returns the final output.""" + return await self._aexecute_tasks(self.tasks) + + async def _arun_hierarchical_process(self) -> CrewOutput: + """Creates and assigns a manager agent to complete the tasks using native async.""" + self._create_manager_agent() + return await self._aexecute_tasks(self.tasks) + + async def _aexecute_tasks( + self, + tasks: list[Task], + start_index: int | None = 0, + was_replayed: bool = False, + ) -> CrewOutput: + """Executes tasks using native async and returns the final output. + + Args: + tasks: List of tasks to execute + start_index: Index to start execution from (for replay) + was_replayed: Whether this is a replayed execution + + Returns: + CrewOutput: Final output of the crew + """ + task_outputs: list[TaskOutput] = [] + pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]] = [] + last_sync_output: TaskOutput | None = None + + for task_index, task in enumerate(tasks): + if start_index is not None and task_index < start_index: + if task.output: + if task.async_execution: + task_outputs.append(task.output) + else: + task_outputs = [task.output] + last_sync_output = task.output + continue + + agent_to_use = self._get_agent_to_use(task) + if agent_to_use is None: + raise ValueError( + f"No agent available for task: {task.description}. " + f"Ensure that either the task has an assigned agent " + f"or a manager agent is provided." + ) + + tools_for_task = task.tools or agent_to_use.tools or [] + tools_for_task = self._prepare_tools( + agent_to_use, + task, + tools_for_task, + ) + + self._log_task_start(task, agent_to_use.role) + + if isinstance(task, ConditionalTask): + skipped_task_output = await self._ahandle_conditional_task( + task, task_outputs, pending_tasks, task_index, was_replayed + ) + if skipped_task_output: + task_outputs.append(skipped_task_output) + continue + + if task.async_execution: + context = self._get_context( + task, [last_sync_output] if last_sync_output else [] + ) + async_task = asyncio.create_task( + task.aexecute_sync( + agent=agent_to_use, + context=context, + tools=tools_for_task, + ) + ) + pending_tasks.append((task, async_task, task_index)) + else: + if pending_tasks: + task_outputs = await self._aprocess_async_tasks( + pending_tasks, was_replayed + ) + pending_tasks.clear() + + context = self._get_context(task, task_outputs) + task_output = await task.aexecute_sync( + agent=agent_to_use, + context=context, + tools=tools_for_task, + ) + task_outputs.append(task_output) + self._process_task_result(task, task_output) + self._store_execution_log(task, task_output, task_index, was_replayed) + + if pending_tasks: + task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed) + + return self._create_crew_output(task_outputs) + + async def _ahandle_conditional_task( + self, + task: ConditionalTask, + task_outputs: list[TaskOutput], + pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]], + task_index: int, + was_replayed: bool, + ) -> TaskOutput | None: + """Handle conditional task evaluation using native async.""" + if pending_tasks: + task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed) + pending_tasks.clear() + + previous_output = task_outputs[-1] if task_outputs else None + if previous_output is not None and not task.should_execute(previous_output): + self._logger.log( + "debug", + f"Skipping conditional task: {task.description}", + color="yellow", + ) + skipped_task_output = task.get_skipped_task_output() + + if not was_replayed: + self._store_execution_log(task, skipped_task_output, task_index) + return skipped_task_output + return None + + async def _aprocess_async_tasks( + self, + pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]], + was_replayed: bool = False, + ) -> list[TaskOutput]: + """Process pending async tasks and return their outputs.""" + task_outputs: list[TaskOutput] = [] + for future_task, async_task, task_index in pending_tasks: + task_output = await async_task + task_outputs.append(task_output) + self._process_task_result(future_task, task_output) + self._store_execution_log( + future_task, task_output, task_index, was_replayed + ) + return task_outputs + def _handle_crew_planning(self) -> None: """Handles the Crew planning.""" self._logger.log("info", "Planning the crew execution") diff --git a/lib/crewai/tests/crew/test_async_crew.py b/lib/crewai/tests/crew/test_async_crew.py new file mode 100644 index 000000000..aaaffa64f --- /dev/null +++ b/lib/crewai/tests/crew/test_async_crew.py @@ -0,0 +1,384 @@ +"""Tests for async crew execution.""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +from crewai.agent import Agent +from crewai.crew import Crew +from crewai.task import Task +from crewai.crews.crew_output import CrewOutput +from crewai.tasks.task_output import TaskOutput + + +@pytest.fixture +def test_agent() -> Agent: + """Create a test agent.""" + return Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + llm="gpt-4o-mini", + verbose=False, + ) + + +@pytest.fixture +def test_task(test_agent: Agent) -> Task: + """Create a test task.""" + return Task( + description="Test task description", + expected_output="Test expected output", + agent=test_agent, + ) + + +@pytest.fixture +def test_crew(test_agent: Agent, test_task: Task) -> Crew: + """Create a test crew.""" + return Crew( + agents=[test_agent], + tasks=[test_task], + verbose=False, + ) + + +class TestAsyncCrewKickoff: + """Tests for async crew kickoff methods.""" + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_akickoff_basic( + self, mock_execute: AsyncMock, test_crew: Crew + ) -> None: + """Test basic async crew kickoff.""" + mock_output = TaskOutput( + description="Test task description", + raw="Task result", + agent="Test Agent", + ) + mock_execute.return_value = mock_output + + result = await test_crew.akickoff() + + assert result is not None + assert isinstance(result, CrewOutput) + assert result.raw == "Task result" + mock_execute.assert_called_once() + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_akickoff_with_inputs( + self, mock_execute: AsyncMock, test_agent: Agent + ) -> None: + """Test async crew kickoff with inputs.""" + task = Task( + description="Test task for {topic}", + expected_output="Expected output for {topic}", + agent=test_agent, + ) + crew = Crew( + agents=[test_agent], + tasks=[task], + verbose=False, + ) + + mock_output = TaskOutput( + description="Test task for AI", + raw="Task result about AI", + agent="Test Agent", + ) + mock_execute.return_value = mock_output + + result = await crew.akickoff(inputs={"topic": "AI"}) + + assert result is not None + assert isinstance(result, CrewOutput) + mock_execute.assert_called_once() + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_akickoff_multiple_tasks( + self, mock_execute: AsyncMock, test_agent: Agent + ) -> None: + """Test async crew kickoff with multiple tasks.""" + task1 = Task( + description="First task", + expected_output="First output", + agent=test_agent, + ) + task2 = Task( + description="Second task", + expected_output="Second output", + agent=test_agent, + ) + crew = Crew( + agents=[test_agent], + tasks=[task1, task2], + verbose=False, + ) + + mock_output1 = TaskOutput( + description="First task", + raw="First result", + agent="Test Agent", + ) + mock_output2 = TaskOutput( + description="Second task", + raw="Second result", + agent="Test Agent", + ) + mock_execute.side_effect = [mock_output1, mock_output2] + + result = await crew.akickoff() + + assert result is not None + assert isinstance(result, CrewOutput) + assert result.raw == "Second result" + assert mock_execute.call_count == 2 + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_akickoff_handles_exception( + self, mock_execute: AsyncMock, test_crew: Crew + ) -> None: + """Test that async kickoff handles exceptions properly.""" + mock_execute.side_effect = RuntimeError("Test error") + + with pytest.raises(RuntimeError) as exc_info: + await test_crew.akickoff() + + assert "Test error" in str(exc_info.value) + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_akickoff_calls_before_callbacks( + self, mock_execute: AsyncMock, test_agent: Agent + ) -> None: + """Test that async kickoff calls before_kickoff_callbacks.""" + callback_called = False + + def before_callback(inputs: dict | None) -> dict: + nonlocal callback_called + callback_called = True + return inputs or {} + + task = Task( + description="Test task", + expected_output="Test output", + agent=test_agent, + ) + crew = Crew( + agents=[test_agent], + tasks=[task], + verbose=False, + before_kickoff_callbacks=[before_callback], + ) + + mock_output = TaskOutput( + description="Test task", + raw="Task result", + agent="Test Agent", + ) + mock_execute.return_value = mock_output + + await crew.akickoff() + + assert callback_called + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_akickoff_calls_after_callbacks( + self, mock_execute: AsyncMock, test_agent: Agent + ) -> None: + """Test that async kickoff calls after_kickoff_callbacks.""" + callback_called = False + + def after_callback(result: CrewOutput) -> CrewOutput: + nonlocal callback_called + callback_called = True + return result + + task = Task( + description="Test task", + expected_output="Test output", + agent=test_agent, + ) + crew = Crew( + agents=[test_agent], + tasks=[task], + verbose=False, + after_kickoff_callbacks=[after_callback], + ) + + mock_output = TaskOutput( + description="Test task", + raw="Task result", + agent="Test Agent", + ) + mock_execute.return_value = mock_output + + await crew.akickoff() + + assert callback_called + + +class TestAsyncCrewKickoffForEach: + """Tests for async crew kickoff_for_each methods.""" + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_akickoff_for_each_basic( + self, mock_execute: AsyncMock, test_agent: Agent + ) -> None: + """Test basic async kickoff_for_each.""" + task = Task( + description="Test task for {topic}", + expected_output="Expected output", + agent=test_agent, + ) + crew = Crew( + agents=[test_agent], + tasks=[task], + verbose=False, + ) + + mock_output1 = TaskOutput( + description="Test task for AI", + raw="Result about AI", + agent="Test Agent", + ) + mock_output2 = TaskOutput( + description="Test task for ML", + raw="Result about ML", + agent="Test Agent", + ) + mock_execute.side_effect = [mock_output1, mock_output2] + + inputs = [{"topic": "AI"}, {"topic": "ML"}] + results = await crew.akickoff_for_each(inputs) + + assert len(results) == 2 + assert all(isinstance(r, CrewOutput) for r in results) + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_akickoff_for_each_concurrent( + self, mock_execute: AsyncMock, test_agent: Agent + ) -> None: + """Test that async kickoff_for_each runs concurrently.""" + task = Task( + description="Test task for {topic}", + expected_output="Expected output", + agent=test_agent, + ) + crew = Crew( + agents=[test_agent], + tasks=[task], + verbose=False, + ) + + mock_output = TaskOutput( + description="Test task", + raw="Result", + agent="Test Agent", + ) + mock_execute.return_value = mock_output + + inputs = [{"topic": f"topic_{i}"} for i in range(3)] + results = await crew.akickoff_for_each(inputs) + + assert len(results) == 3 + + +class TestAsyncTaskExecution: + """Tests for async task execution within crew.""" + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_aexecute_tasks_sequential( + self, mock_execute: AsyncMock, test_agent: Agent + ) -> None: + """Test async sequential task execution.""" + task1 = Task( + description="First task", + expected_output="First output", + agent=test_agent, + ) + task2 = Task( + description="Second task", + expected_output="Second output", + agent=test_agent, + ) + crew = Crew( + agents=[test_agent], + tasks=[task1, task2], + verbose=False, + ) + + mock_output1 = TaskOutput( + description="First task", + raw="First result", + agent="Test Agent", + ) + mock_output2 = TaskOutput( + description="Second task", + raw="Second result", + agent="Test Agent", + ) + mock_execute.side_effect = [mock_output1, mock_output2] + + result = await crew._aexecute_tasks(crew.tasks) + + assert result is not None + assert result.raw == "Second result" + assert len(result.tasks_output) == 2 + + @pytest.mark.asyncio + @patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock) + async def test_aexecute_tasks_with_async_task( + self, mock_execute: AsyncMock, test_agent: Agent + ) -> None: + """Test async execution with async_execution task flag.""" + task1 = Task( + description="Async task", + expected_output="Async output", + agent=test_agent, + async_execution=True, + ) + task2 = Task( + description="Sync task", + expected_output="Sync output", + agent=test_agent, + ) + crew = Crew( + agents=[test_agent], + tasks=[task1, task2], + verbose=False, + ) + + mock_output1 = TaskOutput( + description="Async task", + raw="Async result", + agent="Test Agent", + ) + mock_output2 = TaskOutput( + description="Sync task", + raw="Sync result", + agent="Test Agent", + ) + mock_execute.side_effect = [mock_output1, mock_output2] + + result = await crew._aexecute_tasks(crew.tasks) + + assert result is not None + assert mock_execute.call_count == 2 + + +class TestAsyncProcessAsyncTasks: + """Tests for _aprocess_async_tasks method.""" + + @pytest.mark.asyncio + async def test_aprocess_async_tasks_empty(self, test_crew: Crew) -> None: + """Test processing empty list of async tasks.""" + result = await test_crew._aprocess_async_tasks([]) + assert result == [] \ No newline at end of file