From 13dc7e25e0d904cde756fd28a2b87171082fe728 Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Wed, 14 Jan 2026 14:23:10 -0800 Subject: [PATCH] ensure executors work inside a flow due to flow in flow async structure --- lib/crewai/src/crewai/agent/core.py | 212 +++++++++++++++++- .../src/crewai/experimental/agent_executor.py | 99 +++++++- lib/crewai/src/crewai/flow/flow.py | 4 + .../src/crewai/utilities/agent_utils.py | 18 ++ 4 files changed, 323 insertions(+), 10 deletions(-) diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index da5287957..571777b3a 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -1,7 +1,7 @@ from __future__ import annotations import asyncio -from collections.abc import Callable, Sequence +from collections.abc import Callable, Coroutine, Sequence import shutil import subprocess import time @@ -70,6 +70,7 @@ from crewai.security.fingerprint import Fingerprint from crewai.tools.agent_tools.agent_tools import AgentTools from crewai.utilities.agent_utils import ( get_tool_names, + is_inside_event_loop, load_agent_from_repository, parse_tools, render_text_description_and_args, @@ -1577,13 +1578,17 @@ class Agent(BaseAgent): self, messages: str | list[LLMMessage], response_format: type[Any] | None = None, - ) -> LiteAgentOutput: + ) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]: """ Execute the agent with the given messages using the AgentExecutor. This method provides standalone agent execution without requiring a Crew. It supports tools, response formatting, and guardrails. + When called from within a Flow (inside an event loop), this method + automatically returns a coroutine that the Flow framework will await, + making it work seamlessly in both sync and async contexts. + Args: messages: Either a string query or a list of message dictionaries. If a string is provided, it will be converted to a user message. @@ -1592,7 +1597,11 @@ class Agent(BaseAgent): Returns: LiteAgentOutput: The result of the agent execution. + Or a coroutine if called from within an event loop. """ + if is_inside_event_loop(): + return self.kickoff_async(messages, response_format) + # Process platform apps and MCP tools if self.apps: platform_tools = self.get_platform_tools(self.apps) @@ -1738,8 +1747,70 @@ class Agent(BaseAgent): """ import json - # Execute the agent - result = executor.invoke(inputs) + # Execute the agent (this is called from sync path, so invoke returns dict) + result = cast(dict[str, Any], executor.invoke(inputs)) + raw_output = result.get("output", "") + + # Handle response format conversion + formatted_result: BaseModel | None = None + if response_format: + try: + model_schema = generate_model_description(response_format) + schema = json.dumps(model_schema, indent=2) + instructions = self.i18n.slice("formatted_task_instructions").format( + output_format=schema + ) + + converter = Converter( + llm=self.llm, + text=raw_output, + model=response_format, + instructions=instructions, + ) + + conversion_result = converter.to_pydantic() + if isinstance(conversion_result, BaseModel): + formatted_result = conversion_result + except ConverterError: + pass # Keep raw output if conversion fails + + # Get token usage metrics + if isinstance(self.llm, BaseLLM): + usage_metrics = self.llm.get_token_usage_summary() + else: + usage_metrics = self._token_process.get_summary() + + return LiteAgentOutput( + raw=raw_output, + pydantic=formatted_result, + agent_role=self.role, + usage_metrics=usage_metrics.model_dump() if usage_metrics else None, + messages=executor.messages, + ) + + async def _execute_and_build_output_async( + self, + executor: AgentExecutor, + inputs: dict[str, str], + response_format: type[Any] | None = None, + ) -> LiteAgentOutput: + """Execute the agent asynchronously and build the output object. + + This is the async version of _execute_and_build_output that uses + invoke_async() for native async execution within event loops. + + Args: + executor: The executor instance. + inputs: Input dictionary for execution. + response_format: Optional response format. + + Returns: + LiteAgentOutput with raw output, formatted result, and metrics. + """ + import json + + # Execute the agent asynchronously + result = await executor.invoke_async(inputs) raw_output = result.get("output", "") # Handle response format conversion @@ -1866,7 +1937,9 @@ class Agent(BaseAgent): """ Execute the agent asynchronously with the given messages. - This is the async version of the kickoff method. + This is the async version of the kickoff method that uses native async + execution. It is designed for use within async contexts, such as when + called from within an async Flow method. Args: messages: Either a string query or a list of message dictionaries. @@ -1877,4 +1950,131 @@ class Agent(BaseAgent): Returns: LiteAgentOutput: The result of the agent execution. """ - return await asyncio.to_thread(self.kickoff, messages, response_format) + # Process platform apps and MCP tools + if self.apps: + platform_tools = self.get_platform_tools(self.apps) + if platform_tools and self.tools is not None: + self.tools.extend(platform_tools) + if self.mcps: + mcps = self.get_mcp_tools(self.mcps) + if mcps and self.tools is not None: + self.tools.extend(mcps) + + # Prepare tools + raw_tools: list[BaseTool] = self.tools or [] + parsed_tools = parse_tools(raw_tools) + + # Build agent_info for backward-compatible event emission + agent_info = { + "id": self.id, + "role": self.role, + "goal": self.goal, + "backstory": self.backstory, + "tools": raw_tools, + "verbose": self.verbose, + } + + # Build prompt for standalone execution + prompt = Prompts( + agent=self, + has_tools=len(raw_tools) > 0, + i18n=self.i18n, + use_system_prompt=self.use_system_prompt, + system_template=self.system_template, + prompt_template=self.prompt_template, + response_template=self.response_template, + ).task_execution() + + # Prepare stop words + stop_words = [self.i18n.slice("observation")] + if self.response_template: + stop_words.append( + self.response_template.split("{{ .Response }}")[1].strip() + ) + + # Get RPM limit function + rpm_limit_fn = ( + self._rpm_controller.check_or_wait if self._rpm_controller else None + ) + + # Create the executor for standalone mode (no crew, no task) + executor = AgentExecutor( + task=None, + crew=None, + llm=cast(BaseLLM, self.llm), + agent=self, + prompt=prompt, + max_iter=self.max_iter, + tools=parsed_tools, + tools_names=get_tool_names(parsed_tools), + stop_words=stop_words, + tools_description=render_text_description_and_args(parsed_tools), + tools_handler=self.tools_handler, + original_tools=raw_tools, + step_callback=self.step_callback, + function_calling_llm=self.function_calling_llm, + respect_context_window=self.respect_context_window, + request_within_rpm_limit=rpm_limit_fn, + callbacks=[TokenCalcHandler(self._token_process)], + response_model=response_format, + i18n=self.i18n, + ) + + if isinstance(messages, str): + formatted_messages = messages + else: + # Convert list of messages to a single input string + formatted_messages = "\n".join( + str(msg.get("content", "")) for msg in messages if msg.get("content") + ) + + # Build the input dict for the executor + inputs = { + "input": formatted_messages, + "tool_names": get_tool_names(parsed_tools), + "tools": render_text_description_and_args(parsed_tools), + } + + try: + # Emit started event for backward compatibility with LiteAgent listeners + crewai_event_bus.emit( + self, + event=LiteAgentExecutionStartedEvent( + agent_info=agent_info, + tools=parsed_tools, + messages=messages, + ), + ) + + # Execute asynchronously using invoke_async + output = await self._execute_and_build_output_async( + executor, inputs, response_format + ) + + if self.guardrail is not None: + output = self._process_kickoff_guardrail( + output=output, + executor=executor, + inputs=inputs, + response_format=response_format, + ) + + crewai_event_bus.emit( + self, + event=LiteAgentExecutionCompletedEvent( + agent_info=agent_info, + output=output.raw, + ), + ) + + return output + + except Exception as e: + crewai_event_bus.emit( + self, + event=LiteAgentExecutionErrorEvent( + agent_info=agent_info, + error=str(e), + ), + ) + raise diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 2743fff1b..e0a6b068b 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -1,6 +1,6 @@ from __future__ import annotations -from collections.abc import Callable +from collections.abc import Callable, Coroutine import threading from typing import TYPE_CHECKING, Any, Literal, cast from uuid import uuid4 @@ -37,6 +37,7 @@ from crewai.utilities.agent_utils import ( handle_unknown_error, has_reached_max_iterations, is_context_length_exceeded, + is_inside_event_loop, process_llm_response, ) from crewai.utilities.constants import TRAINING_DATA_FILE @@ -182,7 +183,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): else self.stop ) ) - self._state = AgentReActState() def _ensure_flow_initialized(self) -> None: @@ -453,9 +453,99 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): return "initialized" - def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]: + def invoke( + self, inputs: dict[str, Any] + ) -> dict[str, Any] | Coroutine[Any, Any, dict[str, Any]]: """Execute agent with given inputs. + When called from within an existing event loop (e.g., inside a Flow), + this method returns a coroutine that should be awaited. The Flow + framework handles this automatically. + + Args: + inputs: Input dictionary containing prompt variables. + + Returns: + Dictionary with agent output, or a coroutine if inside an event loop. + """ + # Magic auto-async: if inside event loop, return coroutine for Flow to await + if is_inside_event_loop(): + return self.invoke_async(inputs) + + self._ensure_flow_initialized() + + with self._execution_lock: + if self._is_executing: + raise RuntimeError( + "Executor is already running. " + "Cannot invoke the same executor instance concurrently." + ) + self._is_executing = True + self._has_been_invoked = True + + try: + # Reset state for fresh execution + self.state.messages.clear() + self.state.iterations = 0 + self.state.current_answer = None + self.state.is_finished = False + + if "system" in self.prompt: + prompt = cast("SystemPromptResult", self.prompt) + system_prompt = self._format_prompt(prompt["system"], inputs) + user_prompt = self._format_prompt(prompt["user"], inputs) + self.state.messages.append( + format_message_for_llm(system_prompt, role="system") + ) + self.state.messages.append(format_message_for_llm(user_prompt)) + else: + user_prompt = self._format_prompt(self.prompt["prompt"], inputs) + self.state.messages.append(format_message_for_llm(user_prompt)) + + self.state.ask_for_human_input = bool( + inputs.get("ask_for_human_input", False) + ) + + self.kickoff() + + formatted_answer = self.state.current_answer + + if not isinstance(formatted_answer, AgentFinish): + raise RuntimeError( + "Agent execution ended without reaching a final answer." + ) + + if self.state.ask_for_human_input: + formatted_answer = self._handle_human_feedback(formatted_answer) + + self._create_short_term_memory(formatted_answer) + self._create_long_term_memory(formatted_answer) + self._create_external_memory(formatted_answer) + + return {"output": formatted_answer.output} + + except AssertionError: + fail_text = Text() + fail_text.append("❌ ", style="red bold") + fail_text.append( + "Agent failed to reach a final answer. This is likely a bug - please report it.", + style="red", + ) + self._console.print(fail_text) + raise + except Exception as e: + handle_unknown_error(self._printer, e) + raise + finally: + self._is_executing = False + + async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]: + """Execute agent asynchronously with given inputs. + + This method is designed for use within async contexts, such as when + the agent is called from within an async Flow method. It uses + kickoff_async() directly instead of running in a separate thread. + Args: inputs: Input dictionary containing prompt variables. @@ -496,7 +586,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): inputs.get("ask_for_human_input", False) ) - self.kickoff() + # Use async kickoff directly since we're already in an async context + await self.kickoff_async() formatted_answer = self.state.current_answer diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index dcc0c78bc..f3a31df3d 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1579,6 +1579,10 @@ class Flow(Generic[T], metaclass=FlowMeta): else method(*args, **kwargs) ) + # Auto-await coroutines from sync methods (enables agent.kickoff() inside flows) + if asyncio.iscoroutine(result): + result = await result + self._method_outputs.append(result) self._method_execution_counts[method_name] = ( self._method_execution_counts.get(method_name, 0) + 1 diff --git a/lib/crewai/src/crewai/utilities/agent_utils.py b/lib/crewai/src/crewai/utilities/agent_utils.py index 973ad5596..2249e0d6e 100644 --- a/lib/crewai/src/crewai/utilities/agent_utils.py +++ b/lib/crewai/src/crewai/utilities/agent_utils.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from collections.abc import Callable, Sequence import json import re @@ -54,6 +55,23 @@ console = Console() _MULTIPLE_NEWLINES: Final[re.Pattern[str]] = re.compile(r"\n+") +def is_inside_event_loop() -> bool: + """Check if code is currently running inside an asyncio event loop. + + This is used to detect when code is being called from within an async context + (e.g., inside a Flow). In such cases, callers should return a coroutine + instead of executing synchronously to avoid nested event loop errors. + + Returns: + True if inside a running event loop, False otherwise. + """ + try: + asyncio.get_running_loop() + return True + except RuntimeError: + return False + + def parse_tools(tools: list[BaseTool]) -> list[CrewStructuredTool]: """Parse tools to be used for the task.